Skip to content

Commit

Permalink
Add support for activities with non-default DataStateMergers
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Dec 9, 2024
1 parent 1709933 commit 0b1d8c4
Show file tree
Hide file tree
Showing 29 changed files with 528 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.polypheny.db.workflow.dag.edges.Edge;
import org.polypheny.db.workflow.dag.variables.ReadableVariableStore;
import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge;
import org.polypheny.db.workflow.engine.storage.StorageManager;
import org.polypheny.db.workflow.models.EdgeModel;
import org.polypheny.db.workflow.models.WorkflowConfigModel;
import org.polypheny.db.workflow.models.WorkflowModel;
Expand Down Expand Up @@ -109,6 +110,14 @@ public interface Workflow {
*/
boolean hasStableInVariables( UUID activityId );

/**
* Returns a list containing a preview of all input types for the specified activity.
* Not yet available input types are empty Optionals.
* As inactive data edges cannot transmit data, their type is set to null.
*
* @param activityId target activity
* @return a list of all input types ordered by inPort index
*/
List<Optional<AlgDataType>> getInputTypes( UUID activityId );

int getInPortCount( UUID activityId );
Expand All @@ -121,9 +130,9 @@ public interface Workflow {

AttributedDirectedGraph<UUID, ExecutionEdge> toDag();

void validateStructure() throws Exception;
void validateStructure( StorageManager sm ) throws Exception;

void validateStructure( AttributedDirectedGraph<UUID, ExecutionEdge> subDag ) throws IllegalStateException;
void validateStructure( StorageManager sm, AttributedDirectedGraph<UUID, ExecutionEdge> subDag ) throws IllegalStateException;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
import org.polypheny.db.util.graph.CycleDetector;
import org.polypheny.db.util.graph.TopologicalOrderIterator;
import org.polypheny.db.workflow.dag.activities.ActivityWrapper;
import org.polypheny.db.workflow.dag.activities.ActivityWrapper.ActivityState;
import org.polypheny.db.workflow.dag.edges.DataEdge;
import org.polypheny.db.workflow.dag.edges.Edge;
import org.polypheny.db.workflow.dag.edges.Edge.EdgeState;
import org.polypheny.db.workflow.dag.variables.ReadableVariableStore;
import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge;
import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge.ExecutionEdgeFactory;
import org.polypheny.db.workflow.engine.storage.StorageManager;
import org.polypheny.db.workflow.models.ActivityConfigModel.CommonType;
import org.polypheny.db.workflow.models.ActivityModel;
import org.polypheny.db.workflow.models.EdgeModel;
Expand Down Expand Up @@ -205,7 +207,11 @@ public List<Optional<AlgDataType>> getInputTypes( UUID activityId ) {

for ( int i = 0; i < getInPortCount( activityId ); i++ ) {
DataEdge dataEdge = getDataEdge( activityId, i );
inputTypes.add( dataEdge.getFrom().getOutTypePreview().get( dataEdge.getFromPort() ) );
if ( dataEdge.getState() == EdgeState.INACTIVE ) {
inputTypes.add( null );
} else {
inputTypes.add( dataEdge.getFrom().getOutTypePreview().get( dataEdge.getFromPort() ) );
}
}
return inputTypes;
}
Expand Down Expand Up @@ -255,13 +261,13 @@ public AttributedDirectedGraph<UUID, ExecutionEdge> toDag() {


@Override
public void validateStructure() throws Exception {
validateStructure( toDag() );
public void validateStructure( StorageManager sm ) throws Exception {
validateStructure( sm, toDag() );
}


@Override
public void validateStructure( AttributedDirectedGraph<UUID, ExecutionEdge> subDag ) throws IllegalStateException {
public void validateStructure( StorageManager sm, AttributedDirectedGraph<UUID, ExecutionEdge> subDag ) throws IllegalStateException {
if ( subDag.vertexSet().isEmpty() && subDag.edgeSet().isEmpty() ) {
return;
}
Expand All @@ -283,6 +289,19 @@ public void validateStructure( AttributedDirectedGraph<UUID, ExecutionEdge> subD
for ( UUID n : TopologicalOrderIterator.of( subDag ) ) {
ActivityWrapper wrapper = getActivity( n );
CommonType type = wrapper.getConfig().getCommonType();

if ( wrapper.getState() == ActivityState.SAVED ) {
if ( !sm.hasAllCheckpoints( n, wrapper.getDef().getOutPorts().length ) ) {
throw new IllegalStateException( "Found missing checkpoint for saved activity: " + wrapper );
}
} else if ( wrapper.getState() != ActivityState.FINISHED ) {
for ( int i = 0; i < wrapper.getDef().getOutPorts().length; i++ ) {
if ( sm.hasCheckpoint( n, i ) ) {
throw new IllegalStateException( "Found a checkpoint for an activity that has not yet been executed successfully: " + wrapper );
}
}
}

Set<Integer> requiredInPorts = wrapper.getDef().getRequiredInPorts();
Set<Integer> occupiedInPorts = new HashSet<>();
for ( ExecutionEdge execEdge : subDag.getInwardEdges( n ) ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public interface Activity {
* If a setting, input or output type is available, it is guaranteed to not change anymore.
* This method should be idempotent.
*
* @param inTypes a list of {@link Optional<AlgDataType>} representing the input tuple types.
* @param inTypes a list of {@link Optional<AlgDataType>} representing the input tuple types. For inactive edges, the entry is null (important for non-default DataStateMergers).
* @param settings a map of setting keys to {@link Optional<SettingValue>} representing the available settings, i.e. all settings that do not contain variables.
* @return a list of {@link Optional<AlgDataType>} representing the expected output tuple types.
* If an output type cannot be determined at this point, the corresponding {@link Optional} will be empty.
Expand All @@ -61,7 +61,7 @@ static List<Optional<AlgDataType>> wrapType( @Nullable AlgDataType type ) {
* CheckpointWriters for any outputs are created from the ExecutionContext.
* The settings do not incorporate any changes to variables from {@code updateVariables()}.
*
* @param inputs a list of input readers for each input specified by the annotation.
* @param inputs a list of input readers for each input specified by the annotation. For activities with a custom DataStateMerger that allows inactive inputs, readers of inactive edges are null.
* @param settings the instantiated setting values, according to the specified settings annotations
* @param ctx ExecutionContext to be used for creating checkpoints, updating progress and periodically checking for an abort
* @throws Exception in case the execution fails or is interrupted at any point
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.stream.Collectors;
import lombok.Getter;
import org.polypheny.db.type.PolySerializable;
import org.polypheny.db.workflow.dag.activities.ActivityException.InvalidSettingException;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition;
import org.polypheny.db.workflow.dag.annotations.AdvancedGroup;
import org.polypheny.db.workflow.dag.annotations.DefaultGroup;
Expand Down Expand Up @@ -91,14 +92,22 @@ public static Activity activityFromType( String activityType ) {
* @return an unmodifiable map of setting keys to their corresponding {@code SettingValue} instances.
* @throws IllegalArgumentException if no activity definition is found for the provided {@code activityType}
* or if a JsonNode has an unexpected format.
* @throws InvalidSettingException if the settingValue is not valid for the corresponding SettingDef
*/
public static Settings buildSettingValues( String activityType, Map<String, JsonNode> resolved ) {
public static Settings buildSettingValues( String activityType, Map<String, JsonNode> resolved ) throws InvalidSettingException {
return buildSettingValues( activityType, resolved, false );
}


public static Settings buildSettingValues( String activityType, Map<String, JsonNode> resolved, boolean disableValidation ) throws InvalidSettingException {
Map<String, SettingDef> settingDefs = get( activityType ).getSettings();

Map<String, SettingValue> settingValues = new HashMap<>();
for ( Entry<String, JsonNode> entry : resolved.entrySet() ) {
String key = entry.getKey();
SettingValue settingValue = settingDefs.get( key ).buildValue( entry.getValue() );
SettingValue settingValue = disableValidation ?
settingDefs.get( key ).buildValue( entry.getValue() ) :
settingDefs.get( key ).buildValidatedValue( entry.getValue() );
settingValues.put( key, settingValue );
}
return new Settings( settingValues );
Expand All @@ -112,16 +121,20 @@ public static Settings buildSettingValues( String activityType, Map<String, Json
* @param activityType the identifier for the activity type.
* @param resolved a map of setting keys to {@link Optional<JsonNode>} values, where unresolved settings are {@link Optional#empty()}.
* @return a wrapper around a map of setting keys to {@link Optional<SettingValue>} instances, where missing or unresolved settings are {@link Optional#empty()}.
* @throws IllegalArgumentException if the {@code activityType} is invalid or a {@link JsonNode} has an unexpected format.
* @throws IllegalArgumentException if the {@code activityType} is invalid or a {@link JsonNode} has an unexpected format or is invalid.
*/
public static SettingsPreview buildAvailableSettingValues( String activityType, Map<String, Optional<JsonNode>> resolved ) {
public static SettingsPreview buildAvailableSettingValues( String activityType, Map<String, Optional<JsonNode>> resolved ) throws InvalidSettingException {
Map<String, SettingDef> settingDefs = get( activityType ).getSettings();

Map<String, Optional<SettingValue>> settingValues = new HashMap<>();
for ( Entry<String, Optional<JsonNode>> entry : resolved.entrySet() ) {
String key = entry.getKey();
Optional<SettingValue> settingValue = entry.getValue().map( v -> settingDefs.get( key ).buildValue( v ) );
settingValues.put( key, settingValue );
Optional<JsonNode> node = entry.getValue();
if ( node.isPresent() ) {
settingValues.put( key, Optional.of( settingDefs.get( key ).buildValidatedValue( node.get() ) ) );
} else {
settingValues.put( key, Optional.empty() );
}
}
return new SettingsPreview( settingValues );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import lombok.Setter;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.workflow.dag.activities.Activity.ControlStateMerger;
import org.polypheny.db.workflow.dag.activities.ActivityException.InvalidSettingException;
import org.polypheny.db.workflow.dag.edges.ControlEdge;
import org.polypheny.db.workflow.dag.edges.DataEdge;
import org.polypheny.db.workflow.dag.edges.Edge;
Expand Down Expand Up @@ -74,14 +75,14 @@ public void updateSettings( Map<String, JsonNode> newSettings ) {
}


public Settings resolveSettings() {
public Settings resolveSettings() throws InvalidSettingException {
return ActivityRegistry.buildSettingValues( type, variables.resolveVariables( serializableSettings ) );
}

// TODO: be careful to use correct variables (must be sure they are correct)


public SettingsPreview resolveAvailableSettings( boolean hasStableVariables ) {
public SettingsPreview resolveAvailableSettings( boolean hasStableVariables ) throws InvalidSettingException {
VariableStore store = hasStableVariables ? variables : new VariableStore();
return ActivityRegistry.buildAvailableSettingValues( type, store.resolveAvailableVariables( serializableSettings ) );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface Fusable extends Activity {
* If this method is overridden, it is required to also provide a custom execute implementation.
* This is necessary, as it will be used in the case that the activity cannot be fused.
*
* @param inTypes preview of the input types
* @param inTypes preview of the input types. For inactive edges, the type is null (important for non-default DataStateMergers).
* @param settings preview of the settings
* @return an Optional containing the final decision whether this activity can be fused, or an empty Optional if it cannot be stated at this point.
*/
Expand All @@ -54,7 +54,7 @@ default void execute( List<CheckpointReader> inputs, Settings settings, Executio
/**
* Return an AlgNode representing the new root of a logical query plan.
*
* @param inputs A list of logical input AlgNodes. For relational inputs, the first column contains the primary key. Make sure to remove unnecessary primary key columns, for instance when joining 2 tables.
* @param inputs A list of logical input AlgNodes. For relational inputs, the first column contains the primary key. Make sure to remove unnecessary primary key columns, for instance when joining 2 tables. For inactive edges, the AlgNode is null (important for non-default DataStateMergers).
* @param settings The resolved settings
* @param cluster the AlgCluster that is used for the construction of the query plan
* @return The created logical AlgNode. In case of a relational result, its tuple type has the first column reserved for the primary key. It can be left empty.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public interface Pipeable extends Activity {
* If this method is overridden, it is required to also provide a custom execute implementation.
* This is necessary, as it might be used in the case that this activity cannot be piped.
*
* @param inTypes preview of the input types
* @param inTypes preview of the input types. For inactive edges, the entry is null (important for non-default DataStateMergers).
* @param settings preview of the settings
* @return an Optional containing the final decision whether this activity can be piped, or an empty Optional if it cannot be stated at this point
*/
Expand All @@ -53,7 +53,7 @@ default Optional<Boolean> canPipe( List<Optional<AlgDataType>> inTypes, Settings
* It is always desirable to override this method with a custom implementation that does not depend on {@code pipe()}.
* An issue of this implementation is that it does not check the ExecutionContext for interrupts -> no early return in case of an interrupt!
*
* @param inputs a list of input readers for each input specified by the annotation.
* @param inputs a list of input readers for each input specified by the annotation. For inactive edges, the entry is null (important for non-default DataStateMergers).
* @param settings the instantiated setting values, according to the specified settings annotations
* @param ctx ExecutionContext to be used for creating checkpoints, updating progress and periodically checking for an abort
* @throws Exception in case the execution fails or is interrupted at any point
Expand All @@ -79,7 +79,7 @@ assert canPipe(
* Define the output type of this pipe.
* Afterward, it may no longer be changed until reset() is called.
*
* @param inTypes the types of the input pipes
* @param inTypes the types of the input pipes. For inactive edges, the entry is null (important for non-default DataStateMergers).
* @param settings the resolved settings
* @return the compulsory output type of this instance until the next call to reset(), or null if this activity has no outputs.
*/
Expand All @@ -91,7 +91,7 @@ assert canPipe(
* The Pipeable activity is not expected to close the output themselves. This is done by the executor
* after the pipe method returns.
*
* @param inputs the InputPipes to iterate over
* @param inputs the InputPipes to iterate over. For inactive edges, the pipe is null (important for non-default DataStateMergers).
* @param output the output pipe for sending output tuples to that respect the locked output type, or null if this activity has no output
* @param settings the resolved settings
* @param ctx ExecutionContext to be used for updating progress (interrupt checking is done automatically by the pipes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
outPorts = { @OutPort(type = PortType.REL) }
)

@IntSetting(key = "delay", displayName = "Delay (ms)", defaultValue = 1000)
@IntSetting(key = "delay", displayName = "Delay (ms)", defaultValue = 500)
@BoolSetting(key = "isSuccessful", displayName = "Successful Execution", defaultValue = true)
public class DebugActivity implements Activity {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.algebra.type.AlgDataTypeFactory;
import org.polypheny.db.algebra.type.AlgDataTypeField;
import org.polypheny.db.catalog.Catalog;
import org.polypheny.db.catalog.entity.logical.LogicalTable;
import org.polypheny.db.catalog.logistic.DataModel;
import org.polypheny.db.languages.LanguageManager;
Expand All @@ -41,7 +40,6 @@
import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory;
import org.polypheny.db.workflow.dag.activities.Activity.PortType;
import org.polypheny.db.workflow.dag.activities.ActivityException;
import org.polypheny.db.workflow.dag.activities.ActivityException.InvalidSettingException;
import org.polypheny.db.workflow.dag.activities.Fusable;
import org.polypheny.db.workflow.dag.activities.Pipeable;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition;
Expand All @@ -63,7 +61,7 @@
inPorts = {},
outPorts = { @OutPort(type = PortType.REL) })

@EntitySetting(key = TABLE_KEY, displayName = "Table", dataModel = DataModel.RELATIONAL)
@EntitySetting(key = TABLE_KEY, displayName = "Table", dataModel = DataModel.RELATIONAL, mustExist = true)

public class RelExtractActivity implements Activity, Fusable, Pipeable {

Expand All @@ -77,7 +75,7 @@ public List<Optional<AlgDataType>> previewOutTypes( List<Optional<AlgDataType>>
Optional<EntityValue> table = settings.get( TABLE_KEY, EntityValue.class );

if ( table.isPresent() ) {
AlgDataType type = getOutputType( getEntity( table.get() ) );
AlgDataType type = getOutputType( table.get().getTable() );
return Activity.wrapType( type );
}
return Activity.wrapType( null );
Expand All @@ -86,7 +84,7 @@ public List<Optional<AlgDataType>> previewOutTypes( List<Optional<AlgDataType>>

@Override
public void execute( List<CheckpointReader> inputs, Settings settings, ExecutionContext ctx ) throws Exception {
LogicalTable table = getEntity( settings.get( TABLE_KEY, EntityValue.class ) );
LogicalTable table = settings.get( TABLE_KEY, EntityValue.class ).getTable();
AlgDataType type = getOutputType( table );


Expand Down Expand Up @@ -158,7 +156,7 @@ public AlgNode fuse( List<AlgNode> inputs, Settings settings, AlgCluster cluster

@Override
public AlgDataType lockOutputType( List<AlgDataType> inTypes, Settings settings ) throws Exception {
lockedEntity = getEntity( settings.get( TABLE_KEY, EntityValue.class ) );
lockedEntity = settings.get( TABLE_KEY, EntityValue.class ).getTable();
return getOutputType( lockedEntity );
}

Expand All @@ -170,12 +168,6 @@ public void pipe( List<InputPipe> inputs, OutputPipe output, Settings settings,
}


private LogicalTable getEntity( EntityValue setting ) throws ActivityException {
return Catalog.snapshot().rel().getTable( setting.getNamespace(), setting.getName() ).orElseThrow(
() -> new InvalidSettingException( "Specified table does not exist", "table" ) );
}


private AlgDataType getOutputType( LogicalTable table ) {
// we insert the primary key column
AlgDataTypeFactory factory = AlgDataTypeFactory.DEFAULT;
Expand Down
Loading

0 comments on commit 0b1d8c4

Please sign in to comment.