From 33151eb1eff624055e2aa50fb893f7719fab055f Mon Sep 17 00:00:00 2001 From: Tobias Weber Date: Thu, 19 Dec 2024 18:17:09 +0100 Subject: [PATCH] Enable workflow editing via websocket --- .../polypheny/db/workflow/dag/Workflow.java | 31 +++- .../db/workflow/dag/WorkflowImpl.java | 129 +++++++++++++++- .../dag/activities/ActivityWrapper.java | 8 +- .../polypheny/db/workflow/dag/edges/Edge.java | 10 +- .../engine/monitoring/ExecutionMonitor.java | 70 +++++++++ .../engine/scheduler/GlobalScheduler.java | 11 +- .../engine/scheduler/WorkflowScheduler.java | 16 +- .../db/workflow/models/ActivityModel.java | 6 +- .../workflow/models/requests/WsRequest.java | 116 +++++++++++++++ .../workflow/models/responses/WsResponse.java | 138 ++++++++++++++++++ .../websocket/CreateActivityRequest.java | 24 --- .../websocket/DeleteActivityRequest.java | 25 ---- .../workflow/models/websocket/WsRequest.java | 42 ------ .../db/workflow/session/AbstractSession.java | 133 ++++++++++++++--- .../db/workflow/session/ApiSession.java | 2 +- .../db/workflow/session/SessionManager.java | 4 +- .../db/workflow/session/UserSession.java | 95 ++++++++++-- .../workflow/session/WorkflowWebSocket.java | 20 ++- .../engine/scheduler/GlobalSchedulerTest.java | 2 +- .../scheduler/WorkflowSchedulerTest.java | 2 +- 20 files changed, 728 insertions(+), 156 deletions(-) create mode 100644 plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/requests/WsRequest.java create mode 100644 plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/responses/WsResponse.java delete mode 100644 plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/websocket/CreateActivityRequest.java delete mode 100644 plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/websocket/DeleteActivityRequest.java delete mode 100644 plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/websocket/WsRequest.java diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java index a3d4f4bb84..c4198993ff 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/Workflow.java @@ -16,9 +16,13 @@ package org.polypheny.db.workflow.dag; +import com.fasterxml.jackson.databind.JsonNode; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; +import javax.annotation.Nullable; import org.polypheny.db.algebra.type.AlgDataType; import org.polypheny.db.util.graph.AttributedDirectedGraph; import org.polypheny.db.workflow.dag.activities.ActivityException; @@ -28,7 +32,9 @@ import org.polypheny.db.workflow.dag.variables.VariableStore; import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge; import org.polypheny.db.workflow.engine.storage.StorageManager; +import org.polypheny.db.workflow.models.ActivityConfigModel; import org.polypheny.db.workflow.models.EdgeModel; +import org.polypheny.db.workflow.models.RenderModel; import org.polypheny.db.workflow.models.WorkflowConfigModel; import org.polypheny.db.workflow.models.WorkflowModel; @@ -78,6 +84,8 @@ public interface Workflow { WorkflowConfigModel getConfig(); + void setConfig( WorkflowConfigModel config ); + WorkflowState getState(); VariableStore getVariables(); @@ -144,11 +152,28 @@ public interface Workflow { int getInPortCount( UUID activityId ); - void addActivity( ActivityWrapper activity ); - void deleteActivity( UUID activityId ); + Set getReachableActivities( UUID rootId, boolean includeRoot ); + + /** + * Resets the activity and all activities reachable from it. + * + * @param activityId target activity, or null if all activities should be reset + * @param sm the StorageManager to be used to delete any existing checkpoints for activities being reset + */ + void reset( UUID activityId, StorageManager sm ); + + void reset( StorageManager sm ); + + ActivityWrapper addActivity( String activityType, RenderModel renderModel ); + + void deleteActivity( UUID activityId, StorageManager sm ); + + void addEdge( EdgeModel model, StorageManager sm ); + + void deleteEdge( EdgeModel model, StorageManager sm ); - void deleteEdge( EdgeModel model ); + ActivityWrapper updateActivity( UUID activityId, @Nullable Map settings, @Nullable ActivityConfigModel config, @Nullable RenderModel rendering, StorageManager sm ); AttributedDirectedGraph toDag(); diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java index f6d20bb645..4afa596f47 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java @@ -18,14 +18,19 @@ import com.fasterxml.jackson.databind.JsonNode; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.Queue; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.Nullable; import lombok.Getter; import lombok.Setter; import org.polypheny.db.algebra.type.AlgDataType; @@ -44,10 +49,13 @@ import org.polypheny.db.workflow.dag.variables.VariableStore; import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge; import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge.ExecutionEdgeFactory; +import org.polypheny.db.workflow.engine.scheduler.GraphUtils; import org.polypheny.db.workflow.engine.storage.StorageManager; +import org.polypheny.db.workflow.models.ActivityConfigModel; import org.polypheny.db.workflow.models.ActivityConfigModel.CommonType; import org.polypheny.db.workflow.models.ActivityModel; import org.polypheny.db.workflow.models.EdgeModel; +import org.polypheny.db.workflow.models.RenderModel; import org.polypheny.db.workflow.models.WorkflowConfigModel; import org.polypheny.db.workflow.models.WorkflowModel; @@ -56,7 +64,8 @@ public class WorkflowImpl implements Workflow { private final Map activities; private final Map, List> edges; @Getter - private final WorkflowConfigModel config; + @Setter + private WorkflowConfigModel config; @Getter @Setter private WorkflowState state = WorkflowState.IDLE; @@ -75,7 +84,6 @@ private WorkflowImpl( Map activities, Map key = Pair.of( e.getFromId(), e.getToId() ); + Pair key = e.toPair(); List edgeList = edges.computeIfAbsent( key, k -> new ArrayList<>() ); edgeList.add( Edge.fromModel( e, activities ) ); } @@ -110,6 +118,11 @@ public ActivityWrapper getActivity( UUID activityId ) { } + private ActivityWrapper getActivityOrThrow( UUID activityId ) { + return Objects.requireNonNull( activities.get( activityId ), "Activity does not exist: " + activityId ); + } + + @Override public List getEdges() { return edges.values() @@ -264,7 +277,64 @@ public int getInPortCount( UUID activityId ) { @Override - public void addActivity( ActivityWrapper activity ) { + public Set getReachableActivities( UUID rootId, boolean includeRoot ) { + Set visited = new HashSet<>(); + Queue open = new LinkedList<>( List.of( rootId ) ); + while ( !open.isEmpty() ) { + UUID n = open.remove(); + if ( visited.contains( n ) ) { + continue; + } + visited.add( n ); + getOutEdges( n ).forEach( e -> open.add( e.getTo().getId() ) ); + } + + if ( !includeRoot ) { + visited.remove( rootId ); + } + return visited; + } + + + private void resetAll( Collection activities, StorageManager sm ) { + AttributedDirectedGraph subDag = GraphUtils.getInducedSubgraph( toDag(), activities ); + for ( UUID n : TopologicalOrderIterator.of( subDag ) ) { + ActivityWrapper wrapper = this.activities.get( n ); + wrapper.resetExecution(); + sm.dropCheckpoints( n ); + updatePreview( n ); + for ( ExecutionEdge e : subDag.getOutwardEdges( n ) ) { + getEdge( e ).resetExecution(); + } + } + } + + + @Override + public void reset( UUID activityId, StorageManager sm ) { + if ( activityId == null ) { + reset( sm ); + return; + } + resetAll( getReachableActivities( activityId, true ), sm ); + } + + + @Override + public void reset( StorageManager sm ) { + resetAll( getActivities().stream().map( ActivityWrapper::getId ).toList(), sm ); + } + + + @Override + public ActivityWrapper addActivity( String activityType, RenderModel renderModel ) { + ActivityWrapper wrapper = ActivityWrapper.fromModel( new ActivityModel( activityType, renderModel ) ); + addActivity( wrapper ); + return wrapper; + } + + + private void addActivity( ActivityWrapper activity ) { if ( activities.containsKey( activity.getId() ) ) { throw new GenericRuntimeException( "Cannot add activity instance that is already part of this workflow." ); } @@ -274,20 +344,63 @@ public void addActivity( ActivityWrapper activity ) { @Override - public void deleteActivity( UUID activityId ) { + public void deleteActivity( UUID activityId, StorageManager sm ) { + Set reachable = getReachableActivities( activityId, false ); edges.entrySet().removeIf( entry -> entry.getKey().left.equals( activityId ) || entry.getKey().right.equals( activityId ) ); activities.remove( activityId ); + sm.dropCheckpoints( activityId ); + resetAll( reachable, sm ); + } + + + @Override + public void addEdge( EdgeModel model, StorageManager sm ) { + if ( getEdge( model ) != null ) { + throw new GenericRuntimeException( "Cannot add an edge that is already part of this workflow." ); + } + Edge edge = Edge.fromModel( model, activities ); + edges.computeIfAbsent( model.toPair(), k -> new ArrayList<>() ).add( edge ); + reset( edge.getTo().getId(), sm ); } @Override - public void deleteEdge( EdgeModel model ) { + public void deleteEdge( EdgeModel model, StorageManager sm ) { + Edge edge = getEdge( model ); List edgeList = edges.get( model.toPair() ); if ( edgeList == null ) { return; } - edgeList.removeIf( e -> e.isEquivalent( model ) ); - // TODO: reset target activity and all successors, update previews + edgeList.remove( edge ); + reset( edge.getTo().getId(), sm ); + } + + + @Override + public ActivityWrapper updateActivity( UUID activityId, @Nullable Map settings, @Nullable ActivityConfigModel config, @Nullable RenderModel rendering, StorageManager sm ) { + ActivityWrapper wrapper = getActivityOrThrow( activityId ); + if ( rendering != null ) { + wrapper.setRendering( rendering ); + } + + boolean requiresReset = false; + if ( config != null ) { + requiresReset = !wrapper.getConfig().equals( config ); + wrapper.setConfig( config ); + } + if ( settings != null ) { + requiresReset = requiresReset || !wrapper.getSerializableSettings().equals( settings ); + if ( settings.isEmpty() ) { + wrapper.resetSettings(); + } else { + wrapper.updateSettings( settings ); + } + } + + if ( requiresReset ) { + reset( activityId, sm ); + } + return wrapper; } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/ActivityWrapper.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/ActivityWrapper.java index 75a4b6ecc5..ee95e05e40 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/ActivityWrapper.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/ActivityWrapper.java @@ -79,6 +79,12 @@ public void updateSettings( Map newSettings ) { } + public void resetSettings() { + serializableSettings.clear(); + serializableSettings.putAll( ActivityRegistry.getSerializableDefaultSettings( type ) ); + } + + public Settings resolveSettings() throws InvalidSettingException { return ActivityRegistry.buildSettingValues( type, variables.resolveVariables( serializableSettings ) ); } @@ -156,7 +162,7 @@ public void resetExecution() { public static ActivityWrapper fromModel( ActivityModel model ) { String type = model.getType(); - // ensure the default value is used for missing settings + // ensuring the default value is used for missing settings Map settings = new HashMap<>( ActivityRegistry.getSerializableDefaultSettings( type ) ); settings.putAll( model.getSettings() ); return new ActivityWrapper( model.getId(), ActivityRegistry.activityFromType( type ), type, settings, model.getConfig(), model.getRendering() ); diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/edges/Edge.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/edges/Edge.java index ef61365a01..e3944b9cbd 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/edges/Edge.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/edges/Edge.java @@ -17,6 +17,7 @@ package org.polypheny.db.workflow.dag.edges; import java.util.Map; +import java.util.Objects; import java.util.UUID; import lombok.Getter; import lombok.Setter; @@ -43,8 +44,8 @@ public Edge( ActivityWrapper from, ActivityWrapper to ) { public static Edge fromModel( EdgeModel model, Map activities ) { - ActivityWrapper from = activities.get( model.getFromId() ); - ActivityWrapper to = activities.get( model.getToId() ); + ActivityWrapper from = Objects.requireNonNull( activities.get( model.getFromId() ), "Cannot create edge from an unknown source activity" ); + ActivityWrapper to = Objects.requireNonNull( activities.get( model.getToId() ), "Cannot create edge to an unknown target activity" ); if ( model.isControl() ) { return new ControlEdge( from, to, model.getFromPort() ); } else { @@ -84,6 +85,11 @@ public boolean isIgnored() { } + public void resetExecution() { + state = EdgeState.IDLE; + } + + public enum EdgeState { IDLE, ACTIVE, diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/monitoring/ExecutionMonitor.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/monitoring/ExecutionMonitor.java index 3de0bc3149..3209b97162 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/monitoring/ExecutionMonitor.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/monitoring/ExecutionMonitor.java @@ -23,11 +23,55 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import lombok.Getter; +import org.apache.commons.lang3.time.StopWatch; +import org.polypheny.db.workflow.dag.Workflow; +import org.polypheny.db.workflow.models.responses.WsResponse; +import org.polypheny.db.workflow.models.responses.WsResponse.ProgressUpdateResponse; +import org.polypheny.db.workflow.models.responses.WsResponse.StateUpdateResponse; public class ExecutionMonitor { + private static final int UPDATE_PROGRESS_DELAY = 2000; + private static final int STATE_UPDATE_DEFER_DELAY = 200; + private final List infos = new CopyOnWriteArrayList<>(); private final Map activityToInfoMap = new ConcurrentHashMap<>(); + private final StopWatch workflowDuration; + + private final Workflow workflow; + @Getter + private final UUID targetActivity; + private final Consumer callback; // used to send updates to clients + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private volatile StateUpdateResponse scheduledUpdate; // Reduce number of state updates we send to the clients + + + public ExecutionMonitor( Workflow workflow, @Nullable UUID targetActivity, @Nullable Consumer callback ) { + this.workflow = workflow; + this.targetActivity = targetActivity; + this.workflowDuration = StopWatch.createStarted(); + this.callback = callback; + if ( callback != null ) { + startPeriodicUpdates(); + } + } + + + private void startPeriodicUpdates() { + scheduler.scheduleAtFixedRate( () -> { + try { + callback.accept( new ProgressUpdateResponse( null, getAllProgress() ) ); + } catch ( Exception e ) { + e.printStackTrace(); // Log any exceptions + } + }, 500, UPDATE_PROGRESS_DELAY, TimeUnit.MILLISECONDS ); + } public void addInfo( ExecutionInfo info ) { @@ -58,4 +102,30 @@ public Map getAllProgress() { return Collections.unmodifiableMap( progressMap ); } + + public void stop() { + workflowDuration.stop(); + forwardStates(); + scheduler.shutdown(); + } + + + public long getWorkflowDurationMillis() { + return workflowDuration.getDuration().toMillis(); + } + + + public synchronized void forwardStates() { + if ( callback != null ) { + boolean isScheduled = scheduledUpdate != null; + scheduledUpdate = new StateUpdateResponse( null, workflow ); + if ( !isScheduled ) { + scheduler.schedule( () -> { + callback.accept( scheduledUpdate ); + scheduledUpdate = null; + }, STATE_UPDATE_DEFER_DELAY, TimeUnit.MILLISECONDS ); + } + } + } + } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java index 109f765e04..1d56a55729 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java @@ -29,6 +29,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.polypheny.db.catalog.exceptions.GenericRuntimeException; @@ -39,6 +40,7 @@ import org.polypheny.db.workflow.engine.monitoring.ExecutionMonitor; import org.polypheny.db.workflow.engine.storage.StorageManager; import org.polypheny.db.workflow.models.ActivityConfigModel.CommonType; +import org.polypheny.db.workflow.models.responses.WsResponse; @Slf4j public class GlobalScheduler { @@ -71,13 +73,18 @@ public static GlobalScheduler getInstance() { } - public synchronized ExecutionMonitor startExecution( Workflow workflow, StorageManager sm, @Nullable UUID targetActivity ) throws Exception { + public synchronized void startExecution( Workflow workflow, StorageManager sm, @Nullable UUID targetActivity ) throws Exception { + startExecution( workflow, sm, targetActivity, null ); + } + + + public synchronized ExecutionMonitor startExecution( Workflow workflow, StorageManager sm, @Nullable UUID targetActivity, Consumer monitoringCallback ) throws Exception { UUID sessionId = sm.getSessionId(); if ( schedulers.containsKey( sessionId ) ) { throw new GenericRuntimeException( "Cannot execute a workflow that is already being executed." ); } interruptedSessions.remove( sessionId ); - ExecutionMonitor monitor = new ExecutionMonitor(); + ExecutionMonitor monitor = new ExecutionMonitor( workflow, targetActivity, monitoringCallback ); WorkflowScheduler scheduler = new WorkflowScheduler( workflow, sm, monitor, GLOBAL_WORKERS, targetActivity ); List submissions = scheduler.startExecution(); if ( submissions.isEmpty() ) { diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java index f6edaded56..c20677e9bc 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/WorkflowScheduler.java @@ -102,6 +102,7 @@ public WorkflowScheduler( Workflow workflow, StorageManager sm, ExecutionMonitor public List startExecution() { + executionMonitor.forwardStates(); return computeNextSubmissions(); } @@ -223,21 +224,19 @@ public List handleExecutionResult( ExecutionResult result ) updateGraph( result.isSuccess(), result.getActivities(), result.getRootId(), execDag ); updatePartitions(); + executionMonitor.forwardStates(); log.warn( "Remaining activities: " + remainingActivities ); if ( remainingActivities.isEmpty() ) { assert pendingCount == 0; - - isFinished = true; - workflow.setState( WorkflowState.IDLE ); + setFinished(); return null; } if ( isAborted ) { if ( pendingCount == 0 ) { - isFinished = true; - workflow.setState( WorkflowState.IDLE ); + setFinished(); } return null; } @@ -386,6 +385,13 @@ private void updatePartitions() { } + private void setFinished() { + workflow.setState( WorkflowState.IDLE ); + executionMonitor.stop(); + isFinished = true; + } + + private void setStates( Set activities, ActivityState state ) { activities.forEach( id -> workflow.getActivity( id ).setState( state ) ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityModel.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityModel.java index 80109f6b9e..e419672899 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityModel.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/ActivityModel.java @@ -40,12 +40,12 @@ public class ActivityModel { public ActivityModel( String type ) { - this( type, UUID.randomUUID() ); + this( type, RenderModel.of() ); } - public ActivityModel( String type, UUID id ) { - this( type, id, ActivityRegistry.getSerializableDefaultSettings( type ), ActivityConfigModel.of(), RenderModel.of(), null ); + public ActivityModel( String type, RenderModel renderModel ) { + this( type, UUID.randomUUID(), ActivityRegistry.getSerializableDefaultSettings( type ), ActivityConfigModel.of(), renderModel, null ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/requests/WsRequest.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/requests/WsRequest.java new file mode 100644 index 0000000000..bfe756035b --- /dev/null +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/requests/WsRequest.java @@ -0,0 +1,116 @@ +/* + * Copyright 2019-2024 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.workflow.models.requests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.Map; +import java.util.UUID; +import javax.annotation.Nullable; +import org.polypheny.db.workflow.models.ActivityConfigModel; +import org.polypheny.db.workflow.models.EdgeModel; +import org.polypheny.db.workflow.models.RenderModel; +import org.polypheny.db.workflow.models.WorkflowConfigModel; + +/** + * The structure of workflows is modified with requests sent over the websocket. + */ +public class WsRequest { + + public RequestType type; + public UUID msgId; + + + public enum RequestType { + CREATE_ACTIVITY, + DELETE_ACTIVITY, + UPDATE_ACTIVITY, + CREATE_EDGE, + DELETE_EDGE, + EXECUTE, + INTERRUPT, + RESET, + UPDATE_CONFIG // workflow config + } + + + public static class CreateActivityRequest extends WsRequest { + + public String activityType; + public RenderModel rendering; + + } + + + public static class UpdateActivityRequest extends WsRequest { + + public UUID targetId; + + // only the fields to be updated are non-null + public @Nullable Map settings; // an empty map resets all settings to their default values + public @Nullable ActivityConfigModel config; + public @Nullable RenderModel rendering; + + } + + + public static class DeleteActivityRequest extends WsRequest { + + public UUID targetId; + + } + + + public static class CreateEdgeRequest extends WsRequest { + + public EdgeModel edge; + + } + + + public static class DeleteEdgeRequest extends WsRequest { + + public EdgeModel edge; + + } + + + public static class ExecuteRequest extends WsRequest { + + public @Nullable UUID targetId; // or null to execute all + + } + + + public static class InterruptRequest extends WsRequest { + + } + + + public static class ResetRequest extends WsRequest { + + public @Nullable UUID rootId; // or null to reset all + + } + + + public static class UpdateConfigRequest extends WsRequest { + + public WorkflowConfigModel workflowConfig; + + } + +} diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/responses/WsResponse.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/responses/WsResponse.java new file mode 100644 index 0000000000..15efdcc7c4 --- /dev/null +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/responses/WsResponse.java @@ -0,0 +1,138 @@ +/* + * Copyright 2019-2024 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.workflow.models.responses; + +import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.ACTIVITY_UPDATE; +import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.PROGRESS_UPDATE; +import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.RENDERING_UPDATE; +import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.STATE_UPDATE; +import static org.polypheny.db.workflow.models.responses.WsResponse.ResponseType.WORKFLOW_UPDATE; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.polypheny.db.workflow.dag.Workflow; +import org.polypheny.db.workflow.dag.Workflow.WorkflowState; +import org.polypheny.db.workflow.dag.activities.ActivityWrapper; +import org.polypheny.db.workflow.dag.activities.ActivityWrapper.ActivityState; +import org.polypheny.db.workflow.models.ActivityModel; +import org.polypheny.db.workflow.models.EdgeModel; +import org.polypheny.db.workflow.models.RenderModel; +import org.polypheny.db.workflow.models.WorkflowModel; + +/** + * The structure of workflows is modified with requests sent over the websocket. + */ +public class WsResponse { + + public final ResponseType type; + public final UUID msgId; + public final UUID parentId; + + + public WsResponse( ResponseType type, @Nullable UUID parentId ) { + this.type = type; + this.parentId = parentId; + this.msgId = UUID.randomUUID(); + } + + + public enum ResponseType { + WORKFLOW_UPDATE, // entire workflow + ACTIVITY_UPDATE, // single activity + RENDERING_UPDATE, // only renderModel of an activity + STATE_UPDATE, // all edge and activity states + PROGRESS_UPDATE, + } + + + public static class WorkflowUpdateResponse extends WsResponse { + + public final WorkflowModel workflow; + + + public WorkflowUpdateResponse( @Nullable UUID parentId, WorkflowModel workflow ) { + super( WORKFLOW_UPDATE, parentId ); + this.workflow = workflow; + } + + } + + + public static class ActivityUpdateResponse extends WsResponse { + + public final ActivityModel activity; + + + public ActivityUpdateResponse( @Nullable UUID parentId, ActivityWrapper activity ) { + super( ACTIVITY_UPDATE, parentId ); + this.activity = activity.toModel( true ); + } + + } + + + public static class RenderingUpdateResponse extends WsResponse { + + public final UUID activityId; + public final RenderModel rendering; + + + public RenderingUpdateResponse( @Nullable UUID parentId, ActivityWrapper wrapper ) { + super( RENDERING_UPDATE, parentId ); + this.activityId = wrapper.getId(); + this.rendering = wrapper.getRendering(); + } + + } + + + public static class StateUpdateResponse extends WsResponse { + + public final WorkflowState workflowState; + public final Map activityStates; + public final List edgeStates; + + + public StateUpdateResponse( @Nullable UUID parentId, Workflow workflow ) { + super( STATE_UPDATE, parentId ); + workflowState = workflow.getState(); + activityStates = workflow.getActivities().stream().collect( Collectors.toMap( + ActivityWrapper::getId, + ActivityWrapper::getState + ) ); + edgeStates = workflow.getEdges().stream().map( e -> e.toModel( true ) ).toList(); + } + + } + + + public static class ProgressUpdateResponse extends WsResponse { + + public final Map progress; + + + public ProgressUpdateResponse( @Nullable UUID parentId, Map progress ) { + super( PROGRESS_UPDATE, parentId ); + this.progress = progress; + } + + } + +} diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/websocket/CreateActivityRequest.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/websocket/CreateActivityRequest.java deleted file mode 100644 index 23ee7101e2..0000000000 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/websocket/CreateActivityRequest.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2019-2024 The Polypheny Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.polypheny.db.workflow.models.websocket; - -public class CreateActivityRequest extends WsRequest { - - public String activityType; - //public int xPos; - //public int yPos; -} diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/websocket/DeleteActivityRequest.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/websocket/DeleteActivityRequest.java deleted file mode 100644 index 1877e50b46..0000000000 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/websocket/DeleteActivityRequest.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2019-2024 The Polypheny Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.polypheny.db.workflow.models.websocket; - -import java.util.UUID; - -public class DeleteActivityRequest extends WsRequest { - - public UUID targetId; - -} diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/websocket/WsRequest.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/websocket/WsRequest.java deleted file mode 100644 index 5727f61bbe..0000000000 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/models/websocket/WsRequest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2019-2024 The Polypheny Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.polypheny.db.workflow.models.websocket; - -import java.util.UUID; - -/** - * The structure of workflows is modified with requests sent over the websocket. - */ -public class WsRequest { - - public RequestType type; - public UUID msgId; - - - public enum RequestType { - CREATE_ACTIVITY, - DELETE_ACTIVITY, - UPDATE_ACTIVITY, - CREATE_EDGE, - DELETE_EDGE, - EXECUTE, // execute a single target activity - EXECUTE_ALL, - INTERRUPT, - - } - -} diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/AbstractSession.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/AbstractSession.java index 28fc05ecf9..429acd294a 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/AbstractSession.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/AbstractSession.java @@ -16,36 +16,54 @@ package org.polypheny.db.workflow.session; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; import java.util.HashSet; import java.util.Set; import java.util.UUID; import javax.annotation.Nullable; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.NotImplementedException; import org.eclipse.jetty.websocket.api.Session; +import org.polypheny.db.catalog.exceptions.GenericRuntimeException; import org.polypheny.db.workflow.dag.Workflow; +import org.polypheny.db.workflow.dag.Workflow.WorkflowState; import org.polypheny.db.workflow.engine.monitoring.ExecutionMonitor; import org.polypheny.db.workflow.engine.scheduler.GlobalScheduler; import org.polypheny.db.workflow.engine.storage.StorageManager; import org.polypheny.db.workflow.engine.storage.StorageManagerImpl; import org.polypheny.db.workflow.models.SessionModel; -import org.polypheny.db.workflow.models.websocket.CreateActivityRequest; -import org.polypheny.db.workflow.models.websocket.DeleteActivityRequest; -import org.polypheny.db.workflow.models.websocket.WsRequest; - - +import org.polypheny.db.workflow.models.requests.WsRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.CreateActivityRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.CreateEdgeRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.DeleteActivityRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.DeleteEdgeRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.ExecuteRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.InterruptRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.ResetRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.UpdateActivityRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.UpdateConfigRequest; +import org.polypheny.db.workflow.models.responses.WsResponse; + +@Slf4j public abstract class AbstractSession { @Getter - final Workflow wf; - final UUID sId; + final Workflow workflow; + final UUID sessionId; final StorageManager sm; + final ObjectMapper mapper = new ObjectMapper(); private final Set subscribers = new HashSet<>(); + final GlobalScheduler scheduler; - protected AbstractSession( Workflow wf, UUID sId ) { - this.wf = wf; - this.sId = sId; - this.sm = new StorageManagerImpl( sId, wf.getConfig().getPreferredStores() ); + protected AbstractSession( Workflow workflow, UUID sessionId ) { + this.workflow = workflow; + this.sessionId = sessionId; + this.sm = new StorageManagerImpl( sessionId, workflow.getConfig().getPreferredStores() ); + this.scheduler = GlobalScheduler.getInstance(); } @@ -70,13 +88,45 @@ public void unsubscribe( Session session ) { public UUID getId() { - return sId; + return sessionId; } public abstract SessionModel toModel(); + void broadcastMessage( WsResponse msg ) { + try { + String json = mapper.writeValueAsString( msg ); + log.info( "Broadcasting message: " + json ); + for ( Session subscriber : subscribers ) { + try { + subscriber.getRemote().sendString( json ); + } catch ( IOException e ) { + subscriber.close(); + } + } + } catch ( JsonProcessingException e ) { + throw new RuntimeException( e ); + } + } + + + void startExecution( @Nullable UUID targetActivity ) { + // TODO: add execution request + + // TODO: implement correct error handling when execution cannot be started + // TODO: reset the activity and all its successors + throwIfNotIdle(); + try { + ExecutionMonitor executionMonitor = GlobalScheduler.getInstance().startExecution( workflow, sm, targetActivity, this::broadcastMessage ); + } catch ( Exception e ) { + throw new RuntimeException( e ); + } + throw new NotImplementedException(); + } + + public void handleRequest( CreateActivityRequest request ) { throwUnsupported( request ); } @@ -87,19 +137,62 @@ public void handleRequest( DeleteActivityRequest request ) { } - void startExecution( @Nullable UUID targetActivity ) { - // TODO: add execution request + public void handleRequest( UpdateActivityRequest request ) { + throwUnsupported( request ); + } - // TODO: implement correct error handling when execution cannot be started - try { - ExecutionMonitor executionMonitor = GlobalScheduler.getInstance().startExecution( wf, sm, targetActivity ); - } catch ( Exception e ) { - throw new RuntimeException( e ); + + public void handleRequest( CreateEdgeRequest request ) { + throwUnsupported( request ); + } + + + public void handleRequest( DeleteEdgeRequest request ) { + throwUnsupported( request ); + } + + + public void handleRequest( ExecuteRequest request ) { + throwUnsupported( request ); + } + + + public void handleRequest( InterruptRequest request ) { + throwUnsupported( request ); + } + + + public void handleRequest( ResetRequest request ) { + throwUnsupported( request ); + } + + + public void handleRequest( UpdateConfigRequest request ) { + throwUnsupported( request ); + } + + + void interruptExecution() { + throwIfNotExecuting(); + scheduler.interruptExecution( sessionId ); + } + + + void throwIfNotIdle() { + if ( workflow.getState() != WorkflowState.IDLE ) { + throw new GenericRuntimeException( "Workflow is currently not in an idle state." ); + } + } + + + void throwIfNotExecuting() { + if ( workflow.getState() != WorkflowState.EXECUTING ) { + throw new GenericRuntimeException( "Workflow is currently not being executed." ); } } - private void throwUnsupported( WsRequest request ) { + void throwUnsupported( WsRequest request ) { throw new UnsupportedOperationException( "This session type does not support " + request.type + " requests." ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/ApiSession.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/ApiSession.java index cd98dce5d6..1d75cba327 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/ApiSession.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/ApiSession.java @@ -37,7 +37,7 @@ public void terminate() { @Override public SessionModel toModel() { - return new SessionModel( SessionModelType.API_SESSION, sId ); + return new SessionModel( SessionModelType.API_SESSION, sessionId ); } } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/SessionManager.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/SessionManager.java index 1c3e616040..9d3b6885ff 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/SessionManager.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/SessionManager.java @@ -96,7 +96,7 @@ public WorkflowModel getActiveWorkflowModel( UUID sId ) { // TODO: handle invalid sId return null; } - return session.getWf().toModel( true ); + return session.getWorkflow().toModel( true ); } @@ -117,7 +117,7 @@ public void saveUserSession( UUID sId, String versionDesc ) throws WorkflowRepoE // TODO: handle invalid sId return; } - int version = repo.writeVersion( session.getWId(), versionDesc, session.getWf().toModel( false ) ); + int version = repo.writeVersion( session.getWId(), versionDesc, session.getWorkflow().toModel( false ) ); session.setOpenedVersion( version ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/UserSession.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/UserSession.java index 46337c19b2..c2b5e7d7b7 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/UserSession.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/UserSession.java @@ -16,17 +16,30 @@ package org.polypheny.db.workflow.session; +import java.util.Map.Entry; import java.util.UUID; import lombok.Getter; import lombok.Setter; import org.apache.commons.lang3.NotImplementedException; import org.polypheny.db.catalog.exceptions.GenericRuntimeException; +import org.polypheny.db.catalog.logistic.DataModel; import org.polypheny.db.workflow.dag.Workflow; import org.polypheny.db.workflow.dag.Workflow.WorkflowState; +import org.polypheny.db.workflow.dag.activities.ActivityWrapper; import org.polypheny.db.workflow.models.SessionModel; import org.polypheny.db.workflow.models.SessionModel.SessionModelType; -import org.polypheny.db.workflow.models.websocket.CreateActivityRequest; -import org.polypheny.db.workflow.models.websocket.DeleteActivityRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.CreateActivityRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.CreateEdgeRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.DeleteActivityRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.DeleteEdgeRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.ExecuteRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.InterruptRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.ResetRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.UpdateActivityRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.UpdateConfigRequest; +import org.polypheny.db.workflow.models.responses.WsResponse.ActivityUpdateResponse; +import org.polypheny.db.workflow.models.responses.WsResponse.RenderingUpdateResponse; +import org.polypheny.db.workflow.models.responses.WsResponse.StateUpdateResponse; public class UserSession extends AbstractSession { @@ -46,41 +59,97 @@ public UserSession( UUID sessionId, Workflow wf, UUID workflowId, int openedVers @Override public void terminate() { - + throw new NotImplementedException(); } @Override public synchronized void handleRequest( CreateActivityRequest request ) { throwIfNotEditable(); - throw new NotImplementedException(); - //Activity activity = Activity.fromType(request.activityType); - //wf.addActivity(activity); + ActivityWrapper activity = workflow.addActivity( request.activityType, request.rendering ); + broadcastMessage( new ActivityUpdateResponse( request.msgId, activity ) ); + } - // send updates to subscriber + @Override + public synchronized void handleRequest( DeleteActivityRequest request ) { + throwIfNotEditable(); + workflow.deleteActivity( request.targetId, sm ); + broadcastMessage( new StateUpdateResponse( request.msgId, workflow ) ); } @Override - public synchronized void handleRequest( DeleteActivityRequest request ) { - // TODO: propagate state changes and reset of nodes in workflow + public void handleRequest( UpdateActivityRequest request ) { throwIfNotEditable(); - wf.deleteActivity( request.targetId ); - throw new NotImplementedException(); + ActivityWrapper activity = workflow.updateActivity( request.targetId, request.settings, request.config, request.rendering, sm ); + + if (request.rendering != null && request.settings == null && request.config == null) { + broadcastMessage( new RenderingUpdateResponse( request.msgId, activity ) ); + return; + } + broadcastMessage( new StateUpdateResponse( request.msgId, workflow ) ); + broadcastMessage( new ActivityUpdateResponse( request.msgId, activity ) ); + } + + + @Override + public void handleRequest( CreateEdgeRequest request ) { + throwIfNotEditable(); + workflow.addEdge( request.edge, sm ); + broadcastMessage( new StateUpdateResponse( request.msgId, workflow ) ); + } + + + @Override + public void handleRequest( DeleteEdgeRequest request ) { + throwIfNotEditable(); + } + + + @Override + public void handleRequest( UpdateConfigRequest request ) { + throwIfNotEditable(); + workflow.setConfig( request.workflowConfig ); + for ( Entry entry : request.workflowConfig.getPreferredStores().entrySet() ) { + sm.setDefaultStore( entry.getKey(), entry.getValue() ); + } + // broadcasting the updated config is not required + } + + + @Override + public void handleRequest( ResetRequest request ) { + throwIfNotEditable(); + workflow.reset( request.rootId, sm ); + broadcastMessage( new StateUpdateResponse( request.msgId, workflow ) ); + } + + + @Override + public void handleRequest( ExecuteRequest request ) { + throwIfNotEditable(); + startExecution( request.targetId ); + } + + + @Override + public void handleRequest( InterruptRequest request ) { + throwIfNotExecuting(); + interruptExecution(); } @Override public SessionModel toModel() { - return new SessionModel( SessionModelType.USER_SESSION, sId, wId, openedVersion ); + return new SessionModel( SessionModelType.USER_SESSION, sessionId, wId, openedVersion ); } private boolean isEditable() { // While we could perform this check within the workflow, we follow the approach where the workflow is not // aware of the semantic meaning of its state. - return wf.getState() == WorkflowState.IDLE; + return workflow.getState() == WorkflowState.IDLE; } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/WorkflowWebSocket.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/WorkflowWebSocket.java index 03c556841c..cac7dd7952 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/WorkflowWebSocket.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/session/WorkflowWebSocket.java @@ -27,9 +27,16 @@ import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; import org.eclipse.jetty.websocket.api.annotations.WebSocket; -import org.polypheny.db.workflow.models.websocket.CreateActivityRequest; -import org.polypheny.db.workflow.models.websocket.DeleteActivityRequest; -import org.polypheny.db.workflow.models.websocket.WsRequest; +import org.polypheny.db.workflow.models.requests.WsRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.CreateActivityRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.CreateEdgeRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.DeleteActivityRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.DeleteEdgeRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.ExecuteRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.InterruptRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.ResetRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.UpdateActivityRequest; +import org.polypheny.db.workflow.models.requests.WsRequest.UpdateConfigRequest; @WebSocket public class WorkflowWebSocket implements Consumer { @@ -59,7 +66,14 @@ public void onMessage( final WsMessageContext ctx ) { switch ( baseRequest.type ) { case CREATE_ACTIVITY -> session.handleRequest( ctx.messageAsClass( CreateActivityRequest.class ) ); + case UPDATE_ACTIVITY -> session.handleRequest( ctx.messageAsClass( UpdateActivityRequest.class ) ); case DELETE_ACTIVITY -> session.handleRequest( ctx.messageAsClass( DeleteActivityRequest.class ) ); + case CREATE_EDGE -> session.handleRequest( ctx.messageAsClass( CreateEdgeRequest.class ) ); + case DELETE_EDGE -> session.handleRequest( ctx.messageAsClass( DeleteEdgeRequest.class ) ); + case EXECUTE -> session.handleRequest( ctx.messageAsClass( ExecuteRequest.class ) ); + case INTERRUPT -> session.handleRequest( ctx.messageAsClass( InterruptRequest.class ) ); + case RESET -> session.handleRequest( ctx.messageAsClass( ResetRequest.class ) ); + case UPDATE_CONFIG -> session.handleRequest( ctx.messageAsClass( UpdateConfigRequest.class ) ); default -> throw new IllegalArgumentException( "Received request with unknown type!" ); } } diff --git a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/GlobalSchedulerTest.java b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/GlobalSchedulerTest.java index df107e983e..01c0697606 100644 --- a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/GlobalSchedulerTest.java +++ b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/GlobalSchedulerTest.java @@ -388,7 +388,7 @@ void executionMonitorTest() throws Exception { Workflow workflow = WorkflowUtils.getLongRunningPipe( delay ); List ids = WorkflowUtils.getTopologicalActivityIds( workflow ); System.out.println( "ids: " + ids ); - ExecutionMonitor monitor = scheduler.startExecution( workflow, sm, null ); + ExecutionMonitor monitor = scheduler.startExecution( workflow, sm, null, null ); for ( int i = 0; i < n; i++ ) { System.out.println( "\nProgress " + i ); diff --git a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/WorkflowSchedulerTest.java b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/WorkflowSchedulerTest.java index 4aa3c94c4e..f910cc49ab 100644 --- a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/WorkflowSchedulerTest.java +++ b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/WorkflowSchedulerTest.java @@ -77,7 +77,7 @@ void singleActivityTest() throws Exception { assertEquals( WorkflowState.IDLE, workflow.getState() ); assertEquals( ActivityState.IDLE, workflow.getActivity( ids.get( 0 ) ).getState() ); - WorkflowScheduler scheduler = new WorkflowScheduler( workflow, sm, new ExecutionMonitor(), globalWorkers, ids.get( 0 ) ); + WorkflowScheduler scheduler = new WorkflowScheduler( workflow, sm, new ExecutionMonitor( workflow, ids.get( 0 ), null ), globalWorkers, ids.get( 0 ) ); assertEquals( WorkflowState.EXECUTING, workflow.getState() ); assertEquals( ActivityState.QUEUED, workflow.getActivity( ids.get( 0 ) ).getState() ); assertEquals( ActivityState.IDLE, workflow.getActivity( ids.get( 1 ) ).getState() );