Skip to content

Commit

Permalink
Enable workflow editing via websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Dec 19, 2024
1 parent a88b67c commit 33151eb
Show file tree
Hide file tree
Showing 20 changed files with 728 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

package org.polypheny.db.workflow.dag;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import javax.annotation.Nullable;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.util.graph.AttributedDirectedGraph;
import org.polypheny.db.workflow.dag.activities.ActivityException;
Expand All @@ -28,7 +32,9 @@
import org.polypheny.db.workflow.dag.variables.VariableStore;
import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge;
import org.polypheny.db.workflow.engine.storage.StorageManager;
import org.polypheny.db.workflow.models.ActivityConfigModel;
import org.polypheny.db.workflow.models.EdgeModel;
import org.polypheny.db.workflow.models.RenderModel;
import org.polypheny.db.workflow.models.WorkflowConfigModel;
import org.polypheny.db.workflow.models.WorkflowModel;

Expand Down Expand Up @@ -78,6 +84,8 @@ public interface Workflow {

WorkflowConfigModel getConfig();

void setConfig( WorkflowConfigModel config );

WorkflowState getState();

VariableStore getVariables();
Expand Down Expand Up @@ -144,11 +152,28 @@ public interface Workflow {

int getInPortCount( UUID activityId );

void addActivity( ActivityWrapper activity );

void deleteActivity( UUID activityId );
Set<UUID> getReachableActivities( UUID rootId, boolean includeRoot );

/**
* Resets the activity and all activities reachable from it.
*
* @param activityId target activity, or null if all activities should be reset
* @param sm the StorageManager to be used to delete any existing checkpoints for activities being reset
*/
void reset( UUID activityId, StorageManager sm );

void reset( StorageManager sm );

ActivityWrapper addActivity( String activityType, RenderModel renderModel );

void deleteActivity( UUID activityId, StorageManager sm );

void addEdge( EdgeModel model, StorageManager sm );

void deleteEdge( EdgeModel model, StorageManager sm );

void deleteEdge( EdgeModel model );
ActivityWrapper updateActivity( UUID activityId, @Nullable Map<String, JsonNode> settings, @Nullable ActivityConfigModel config, @Nullable RenderModel rendering, StorageManager sm );

AttributedDirectedGraph<UUID, ExecutionEdge> toDag();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@

import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.Setter;
import org.polypheny.db.algebra.type.AlgDataType;
Expand All @@ -44,10 +49,13 @@
import org.polypheny.db.workflow.dag.variables.VariableStore;
import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge;
import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge.ExecutionEdgeFactory;
import org.polypheny.db.workflow.engine.scheduler.GraphUtils;
import org.polypheny.db.workflow.engine.storage.StorageManager;
import org.polypheny.db.workflow.models.ActivityConfigModel;
import org.polypheny.db.workflow.models.ActivityConfigModel.CommonType;
import org.polypheny.db.workflow.models.ActivityModel;
import org.polypheny.db.workflow.models.EdgeModel;
import org.polypheny.db.workflow.models.RenderModel;
import org.polypheny.db.workflow.models.WorkflowConfigModel;
import org.polypheny.db.workflow.models.WorkflowModel;

Expand All @@ -56,7 +64,8 @@ public class WorkflowImpl implements Workflow {
private final Map<UUID, ActivityWrapper> activities;
private final Map<Pair<UUID, UUID>, List<Edge>> edges;
@Getter
private final WorkflowConfigModel config;
@Setter
private WorkflowConfigModel config;
@Getter
@Setter
private WorkflowState state = WorkflowState.IDLE;
Expand All @@ -75,7 +84,6 @@ private WorkflowImpl( Map<UUID, ActivityWrapper> activities, Map<Pair<UUID, UUID
this.config = config;
this.variables.reset( variables );

// TODO: compute previews & variables
TopologicalOrderIterator.of( toDag() ).forEach( this::updatePreview );
}

Expand All @@ -89,7 +97,7 @@ public static Workflow fromModel( WorkflowModel model ) {
activities.put( a.getId(), ActivityWrapper.fromModel( a ) );
}
for ( EdgeModel e : model.getEdges() ) {
Pair<UUID, UUID> key = Pair.of( e.getFromId(), e.getToId() );
Pair<UUID, UUID> key = e.toPair();
List<Edge> edgeList = edges.computeIfAbsent( key, k -> new ArrayList<>() );
edgeList.add( Edge.fromModel( e, activities ) );
}
Expand All @@ -110,6 +118,11 @@ public ActivityWrapper getActivity( UUID activityId ) {
}


private ActivityWrapper getActivityOrThrow( UUID activityId ) {
return Objects.requireNonNull( activities.get( activityId ), "Activity does not exist: " + activityId );
}


@Override
public List<Edge> getEdges() {
return edges.values()
Expand Down Expand Up @@ -264,7 +277,64 @@ public int getInPortCount( UUID activityId ) {


@Override
public void addActivity( ActivityWrapper activity ) {
public Set<UUID> getReachableActivities( UUID rootId, boolean includeRoot ) {
Set<UUID> visited = new HashSet<>();
Queue<UUID> open = new LinkedList<>( List.of( rootId ) );
while ( !open.isEmpty() ) {
UUID n = open.remove();
if ( visited.contains( n ) ) {
continue;
}
visited.add( n );
getOutEdges( n ).forEach( e -> open.add( e.getTo().getId() ) );
}

if ( !includeRoot ) {
visited.remove( rootId );
}
return visited;
}


private void resetAll( Collection<UUID> activities, StorageManager sm ) {
AttributedDirectedGraph<UUID, ExecutionEdge> subDag = GraphUtils.getInducedSubgraph( toDag(), activities );
for ( UUID n : TopologicalOrderIterator.of( subDag ) ) {
ActivityWrapper wrapper = this.activities.get( n );
wrapper.resetExecution();
sm.dropCheckpoints( n );
updatePreview( n );
for ( ExecutionEdge e : subDag.getOutwardEdges( n ) ) {
getEdge( e ).resetExecution();
}
}
}


@Override
public void reset( UUID activityId, StorageManager sm ) {
if ( activityId == null ) {
reset( sm );
return;
}
resetAll( getReachableActivities( activityId, true ), sm );
}


@Override
public void reset( StorageManager sm ) {
resetAll( getActivities().stream().map( ActivityWrapper::getId ).toList(), sm );
}


@Override
public ActivityWrapper addActivity( String activityType, RenderModel renderModel ) {
ActivityWrapper wrapper = ActivityWrapper.fromModel( new ActivityModel( activityType, renderModel ) );
addActivity( wrapper );
return wrapper;
}


private void addActivity( ActivityWrapper activity ) {
if ( activities.containsKey( activity.getId() ) ) {
throw new GenericRuntimeException( "Cannot add activity instance that is already part of this workflow." );
}
Expand All @@ -274,20 +344,63 @@ public void addActivity( ActivityWrapper activity ) {


@Override
public void deleteActivity( UUID activityId ) {
public void deleteActivity( UUID activityId, StorageManager sm ) {
Set<UUID> reachable = getReachableActivities( activityId, false );
edges.entrySet().removeIf( entry -> entry.getKey().left.equals( activityId ) || entry.getKey().right.equals( activityId ) );
activities.remove( activityId );
sm.dropCheckpoints( activityId );
resetAll( reachable, sm );
}


@Override
public void addEdge( EdgeModel model, StorageManager sm ) {
if ( getEdge( model ) != null ) {
throw new GenericRuntimeException( "Cannot add an edge that is already part of this workflow." );
}
Edge edge = Edge.fromModel( model, activities );
edges.computeIfAbsent( model.toPair(), k -> new ArrayList<>() ).add( edge );
reset( edge.getTo().getId(), sm );
}


@Override
public void deleteEdge( EdgeModel model ) {
public void deleteEdge( EdgeModel model, StorageManager sm ) {
Edge edge = getEdge( model );
List<Edge> edgeList = edges.get( model.toPair() );
if ( edgeList == null ) {
return;
}
edgeList.removeIf( e -> e.isEquivalent( model ) );
// TODO: reset target activity and all successors, update previews
edgeList.remove( edge );
reset( edge.getTo().getId(), sm );
}


@Override
public ActivityWrapper updateActivity( UUID activityId, @Nullable Map<String, JsonNode> settings, @Nullable ActivityConfigModel config, @Nullable RenderModel rendering, StorageManager sm ) {
ActivityWrapper wrapper = getActivityOrThrow( activityId );
if ( rendering != null ) {
wrapper.setRendering( rendering );
}

boolean requiresReset = false;
if ( config != null ) {
requiresReset = !wrapper.getConfig().equals( config );
wrapper.setConfig( config );
}
if ( settings != null ) {
requiresReset = requiresReset || !wrapper.getSerializableSettings().equals( settings );
if ( settings.isEmpty() ) {
wrapper.resetSettings();
} else {
wrapper.updateSettings( settings );
}
}

if ( requiresReset ) {
reset( activityId, sm );
}
return wrapper;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ public void updateSettings( Map<String, JsonNode> newSettings ) {
}


public void resetSettings() {
serializableSettings.clear();
serializableSettings.putAll( ActivityRegistry.getSerializableDefaultSettings( type ) );
}


public Settings resolveSettings() throws InvalidSettingException {
return ActivityRegistry.buildSettingValues( type, variables.resolveVariables( serializableSettings ) );
}
Expand Down Expand Up @@ -156,7 +162,7 @@ public void resetExecution() {

public static ActivityWrapper fromModel( ActivityModel model ) {
String type = model.getType();
// ensure the default value is used for missing settings
// ensuring the default value is used for missing settings
Map<String, JsonNode> settings = new HashMap<>( ActivityRegistry.getSerializableDefaultSettings( type ) );
settings.putAll( model.getSettings() );
return new ActivityWrapper( model.getId(), ActivityRegistry.activityFromType( type ), type, settings, model.getConfig(), model.getRendering() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.polypheny.db.workflow.dag.edges;

import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -43,8 +44,8 @@ public Edge( ActivityWrapper from, ActivityWrapper to ) {


public static Edge fromModel( EdgeModel model, Map<UUID, ActivityWrapper> activities ) {
ActivityWrapper from = activities.get( model.getFromId() );
ActivityWrapper to = activities.get( model.getToId() );
ActivityWrapper from = Objects.requireNonNull( activities.get( model.getFromId() ), "Cannot create edge from an unknown source activity" );
ActivityWrapper to = Objects.requireNonNull( activities.get( model.getToId() ), "Cannot create edge to an unknown target activity" );
if ( model.isControl() ) {
return new ControlEdge( from, to, model.getFromPort() );
} else {
Expand Down Expand Up @@ -84,6 +85,11 @@ public boolean isIgnored() {
}


public void resetExecution() {
state = EdgeState.IDLE;
}


public enum EdgeState {
IDLE,
ACTIVE,
Expand Down
Loading

0 comments on commit 33151eb

Please sign in to comment.