diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/WorkflowManager.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/WorkflowManager.java index 5a0a87b9e6..d41efd5942 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/WorkflowManager.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/WorkflowManager.java @@ -18,7 +18,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import io.javalin.http.ContentType; import io.javalin.http.Context; @@ -91,7 +90,7 @@ private void createExecuteDummyWorkflowTest() { server.addSerializedRoute( PATH + "/executeDummy/{namespaceName}/{tableName}/{storeName}", ctx -> { System.out.println( "handling dummy execution..." ); - JsonMapper mapper = new JsonMapper(); + ObjectMapper mapper = new ObjectMapper(); ObjectNode setting = mapper.createObjectNode(); setting.put( "namespace", ctx.pathParam( "namespaceName" ) ); setting.put( "name", ctx.pathParam( "tableName" ) ); diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java index 79a2af8a86..98d9778884 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/WorkflowImpl.java @@ -59,9 +59,7 @@ public class WorkflowImpl implements Workflow { public WorkflowImpl() { - this.activities = new ConcurrentHashMap<>(); - this.edges = new ConcurrentHashMap<>(); - this.config = WorkflowConfigModel.of(); + this( new ConcurrentHashMap<>(), new ConcurrentHashMap<>(), WorkflowConfigModel.of() ); } @@ -69,6 +67,8 @@ private WorkflowImpl( Map activities, Map DataModel.GRAPH; }; } + + + public static PortType fromDataModel( DataModel model ) { + return switch ( model ) { + case RELATIONAL -> REL; + case DOCUMENT -> DOC; + case GRAPH -> LPG; + }; + } } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/IdentityActivity.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/IdentityActivity.java index a3e17c3bb8..5d06ae1242 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/IdentityActivity.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/IdentityActivity.java @@ -18,11 +18,15 @@ import java.util.List; import java.util.Optional; +import org.polypheny.db.algebra.AlgNode; +import org.polypheny.db.algebra.logical.relational.LogicalRelProject; import org.polypheny.db.algebra.type.AlgDataType; +import org.polypheny.db.plan.AlgCluster; import org.polypheny.db.workflow.dag.activities.Activity; import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory; import org.polypheny.db.workflow.dag.activities.Activity.PortType; import org.polypheny.db.workflow.dag.activities.ActivityException; +import org.polypheny.db.workflow.dag.activities.Fusable; import org.polypheny.db.workflow.dag.annotations.ActivityDefinition; import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.InPort; import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.OutPort; @@ -50,7 +54,7 @@ ) @IntSetting(key = "I2", displayName = "THIRD", defaultValue = 0, isList = true, group = "groupA") @StringSetting(key = "S2", displayName = "FOURTH", defaultValue = "test", isList = true, group = "groupA", subGroup = "a") -public class IdentityActivity implements Activity { +public class IdentityActivity implements Activity, Fusable { public IdentityActivity() { @@ -72,6 +76,13 @@ public void execute( List inputs, Settings settings, Execution } + @Override + public AlgNode fuse( List inputs, Settings settings, AlgCluster cluster ) throws Exception { + // to make it more interesting, we add a project activity that doesn't change the tupleType + return LogicalRelProject.identity( inputs.get( 0 ) ); + } + + @Override public void reset() { } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelExtractActivity.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelExtractActivity.java index 5639c1cb76..2c772563cd 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelExtractActivity.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelExtractActivity.java @@ -18,12 +18,16 @@ import static org.polypheny.db.workflow.dag.activities.impl.RelExtractActivity.TABLE_KEY; +import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.stream.IntStream; import org.apache.commons.lang3.NotImplementedException; import org.polypheny.db.ResultIterator; import org.polypheny.db.algebra.AlgNode; +import org.polypheny.db.algebra.logical.relational.LogicalRelProject; +import org.polypheny.db.algebra.logical.relational.LogicalRelScan; import org.polypheny.db.algebra.type.AlgDataType; import org.polypheny.db.algebra.type.AlgDataTypeFactory; import org.polypheny.db.algebra.type.AlgDataTypeField; @@ -32,8 +36,12 @@ import org.polypheny.db.languages.LanguageManager; import org.polypheny.db.languages.QueryLanguage; import org.polypheny.db.plan.AlgCluster; +import org.polypheny.db.plan.AlgTraitSet; import org.polypheny.db.processing.ImplementationContext.ExecutedContext; import org.polypheny.db.processing.QueryContext; +import org.polypheny.db.rex.RexIndexRef; +import org.polypheny.db.rex.RexNode; +import org.polypheny.db.schema.trait.ModelTrait; import org.polypheny.db.transaction.Transaction; import org.polypheny.db.type.PolyType; import org.polypheny.db.workflow.dag.activities.Activity; @@ -144,13 +152,18 @@ public void reset() { @Override public AlgNode fuse( List inputs, Settings settings, AlgCluster cluster ) throws Exception { - throw new NotImplementedException(); - /*LogicalTable table = getEntity( settings.get( TABLE_KEY ) ); - AlgDataType type = getOutputType( table ); + LogicalTable table = settings.get( TABLE_KEY, EntityValue.class ).getTable(); AlgTraitSet traits = AlgTraitSet.createEmpty().plus( ModelTrait.RELATIONAL ); AlgNode scan = new LogicalRelScan( cluster, traits, table ); - return LogicalRelProject.create( scan, , );*/ + List projects = new ArrayList<>(); + projects.add( cluster.getRexBuilder().makeBigintLiteral( new BigDecimal( 0 ) ) ); // Add new PK col + IntStream.range( 0, table.getTupleType().getFieldCount() ) + .mapToObj( i -> + new RexIndexRef( i, table.getTupleType().getFields().get( i ).getType() ) + ).forEach( projects::add ); + + return LogicalRelProject.create( scan, projects, getOutputType( table ) ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelLoadActivity.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelLoadActivity.java index 25dcd37f74..d4107eeef2 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelLoadActivity.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelLoadActivity.java @@ -51,7 +51,7 @@ import org.polypheny.db.workflow.engine.execution.context.PipeExecutionContext; import org.polypheny.db.workflow.engine.execution.pipe.InputPipe; import org.polypheny.db.workflow.engine.execution.pipe.OutputPipe; -import org.polypheny.db.workflow.engine.storage.QueryUtils.BatchWriter; +import org.polypheny.db.workflow.engine.storage.BatchWriter; import org.polypheny.db.workflow.engine.storage.StorageManager; import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader; import org.polypheny.db.workflow.engine.storage.writer.RelWriter; diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelMergeActivity.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelMergeActivity.java index c3f46771c9..229e98ba18 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelMergeActivity.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelMergeActivity.java @@ -16,14 +16,16 @@ package org.polypheny.db.workflow.dag.activities.impl; +import java.util.Iterator; import java.util.List; import java.util.Optional; import org.polypheny.db.algebra.type.AlgDataType; +import org.polypheny.db.type.entity.PolyValue; +import org.polypheny.db.util.Pair; import org.polypheny.db.workflow.dag.activities.Activity; import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory; import org.polypheny.db.workflow.dag.activities.Activity.PortType; import org.polypheny.db.workflow.dag.activities.ActivityException; -import org.polypheny.db.workflow.dag.activities.ActivityException.InvalidInputException; import org.polypheny.db.workflow.dag.annotations.ActivityDefinition; import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.InPort; import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.OutPort; @@ -50,10 +52,7 @@ public List> previewOutTypes( List> return List.of( Optional.empty() ); } - if ( !firstType.get().equals( secondType.get() ) ) { - throw new InvalidInputException( "The second input type is not equal to the first input type", 1 ); - } - return List.of( firstType ); + return Activity.wrapType( RelUnionActivity.getTypeOrThrow( List.of( firstType.get(), secondType.get() ) ) ); } @@ -73,8 +72,9 @@ public void execute( List inputs, Settings settings, Execution .queryLanguage( "SQL" ) .query( "SELECT * FROM " + CheckpointQuery.ENTITY( 0 ) + " UNION ALL SELECT * FROM " + CheckpointQuery.ENTITY( 1 ) ) .build(); - try ( CheckpointWriter writer = ctx.createRelWriter( 0, input0.getTupleType(), true ) ) { - writer.write( input0.getIteratorFromQuery( query, inputs ) ); + Pair>> result = input0.getIteratorFromQuery( query, inputs ); + try ( CheckpointWriter writer = ctx.createRelWriter( 0, result.left, true ) ) { + writer.write( result.right ); } } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelUnionActivity.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelUnionActivity.java index 27243b5159..6184912f70 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelUnionActivity.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelUnionActivity.java @@ -16,14 +16,22 @@ package org.polypheny.db.workflow.dag.activities.impl; +import java.util.Iterator; import java.util.List; import java.util.Optional; +import org.polypheny.db.algebra.AlgNode; +import org.polypheny.db.algebra.logical.relational.LogicalRelUnion; import org.polypheny.db.algebra.type.AlgDataType; +import org.polypheny.db.algebra.type.AlgDataTypeFactory; +import org.polypheny.db.plan.AlgCluster; +import org.polypheny.db.type.entity.PolyValue; +import org.polypheny.db.util.Pair; import org.polypheny.db.workflow.dag.activities.Activity; import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory; import org.polypheny.db.workflow.dag.activities.Activity.PortType; import org.polypheny.db.workflow.dag.activities.ActivityException; import org.polypheny.db.workflow.dag.activities.ActivityException.InvalidInputException; +import org.polypheny.db.workflow.dag.activities.Fusable; import org.polypheny.db.workflow.dag.annotations.ActivityDefinition; import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.InPort; import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.OutPort; @@ -38,7 +46,7 @@ inPorts = { @InPort(type = PortType.REL), @InPort(type = PortType.REL) }, outPorts = { @OutPort(type = PortType.REL) } ) -public class RelUnionActivity implements Activity { +public class RelUnionActivity implements Activity, Fusable { @Override public List> previewOutTypes( List> inTypes, SettingsPreview settings ) throws ActivityException { @@ -48,10 +56,7 @@ public List> previewOutTypes( List> if ( firstType.isEmpty() || secondType.isEmpty() ) { return List.of( Optional.empty() ); } - if ( !firstType.get().equals( secondType.get() ) ) { - throw new InvalidInputException( "The second input type is not equal to the first input type", 1 ); - } - return List.of( firstType ); + return Activity.wrapType( getTypeOrThrow( List.of( firstType.get(), secondType.get() ) ) ); } @@ -61,16 +66,34 @@ public void execute( List inputs, Settings settings, Execution .queryLanguage( "SQL" ) .query( "SELECT * FROM " + CheckpointQuery.ENTITY( 0 ) + " UNION ALL SELECT * FROM " + CheckpointQuery.ENTITY( 1 ) ) .build(); - try ( CheckpointWriter writer = ctx.createRelWriter( 0, inputs.get( 0 ).getTupleType(), true ) ) { - writer.write( inputs.get( 0 ).getIteratorFromQuery( query, inputs ) ); + Pair>> result = inputs.get( 0 ).getIteratorFromQuery( query, inputs ); + try ( CheckpointWriter writer = ctx.createRelWriter( 0, result.left, true ) ) { + writer.write( result.right ); } } + @Override + public AlgNode fuse( List inputs, Settings settings, AlgCluster cluster ) throws Exception { + System.out.println( "in types: " + inputs.stream().map( AlgNode::getTupleType ).toList() ); + return LogicalRelUnion.create( inputs, true ); + } + + @Override public void reset() { } + + public static AlgDataType getTypeOrThrow( List inputs ) throws InvalidInputException { + AlgDataType type = AlgDataTypeFactory.DEFAULT.leastRestrictive( inputs ); + + if ( type == null ) { + throw new InvalidInputException( "The tuple types of the inputs are incompatible", 1 ); + } + return type; + } + } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelValuesActivity.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelValuesActivity.java index da58f5e654..8b1af8e618 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelValuesActivity.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/activities/impl/RelValuesActivity.java @@ -16,20 +16,29 @@ package org.polypheny.db.workflow.dag.activities.impl; +import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.Random; +import org.polypheny.db.algebra.AlgNode; +import org.polypheny.db.algebra.logical.relational.LogicalRelValues; import org.polypheny.db.algebra.type.AlgDataType; import org.polypheny.db.algebra.type.AlgDataTypeFactory; +import org.polypheny.db.algebra.type.AlgDataTypeField; +import org.polypheny.db.plan.AlgCluster; +import org.polypheny.db.rex.RexBuilder; +import org.polypheny.db.rex.RexLiteral; import org.polypheny.db.type.PolyType; import org.polypheny.db.type.entity.PolyString; import org.polypheny.db.type.entity.PolyValue; import org.polypheny.db.type.entity.numerical.PolyInteger; +import org.polypheny.db.type.entity.numerical.PolyLong; import org.polypheny.db.workflow.dag.activities.Activity; import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory; import org.polypheny.db.workflow.dag.activities.Activity.PortType; import org.polypheny.db.workflow.dag.activities.ActivityException; +import org.polypheny.db.workflow.dag.activities.Fusable; import org.polypheny.db.workflow.dag.annotations.ActivityDefinition; import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.OutPort; import org.polypheny.db.workflow.dag.annotations.BoolSetting; @@ -49,7 +58,7 @@ ) @IntSetting(key = "rowCount", displayName = "Row Count", defaultValue = 3, min = 1, max = 1_000_000) @BoolSetting(key = "fixSeed", displayName = "Fix Random Seed", defaultValue = false) -public class RelValuesActivity implements Activity { +public class RelValuesActivity implements Activity, Fusable { private static final List NAMES = List.of( "Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Hank" ); private static final List LAST_NAMES = List.of( "Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis" ); @@ -72,6 +81,16 @@ public void execute( List inputs, Settings settings, Execution } + @Override + public AlgNode fuse( List inputs, Settings settings, AlgCluster cluster ) throws Exception { + List> values = getValues( + settings.get( "rowCount", IntValue.class ).getValue(), + settings.get( "fixSeed", BoolValue.class ).getValue() + ); + return LogicalRelValues.create( cluster, getType(), toRexLiterals( getType(), values ) ); + } + + @Override public void reset() { @@ -104,8 +123,33 @@ private static List> getValues( int n, boolean fixSeed ) { } + private static ImmutableList> toRexLiterals( AlgDataType tupleType, List> rows ) { + RexBuilder builder = new RexBuilder( AlgDataTypeFactory.DEFAULT ); + List> records = new ArrayList<>(); + List fieldTypes = tupleType.getFields().stream().map( AlgDataTypeField::getType ).toList(); + for ( final List row : rows ) { + final List record = new ArrayList<>(); + for ( int i = 0; i < row.size(); ++i ) { + // TODO: fix creation of RexLiteral + PolyValue value = row.get( i ); + AlgDataType type = fieldTypes.get( i ); + record.add( new RexLiteral( value, type, type.getPolyType() ) ); + + //record.add( builder.makeLiteral( value, type, type.getPolyType() ) ); + } + records.add( ImmutableList.copyOf( record ) ); + } + return ImmutableList.copyOf( records ); + } + + private static List getRow( String name, String lastName, int age, int salary ) { - return List.of( PolyInteger.of( 0 ), PolyString.of( name ), PolyString.of( lastName ), PolyInteger.of( age ), PolyInteger.of( salary ) ); + return List.of( + PolyLong.of( 0 ), + PolyString.of( name ), + PolyString.of( lastName ), + PolyInteger.of( age ), + PolyInteger.of( salary ) ); } } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/BoolValue.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/BoolValue.java index 449bb9a874..47ffc9f464 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/BoolValue.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/BoolValue.java @@ -17,7 +17,7 @@ package org.polypheny.db.workflow.dag.settings; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.BooleanNode; import lombok.AccessLevel; import lombok.Getter; @@ -52,7 +52,7 @@ public static BoolValue of( JsonNode node ) { @Override - public JsonNode toJson( JsonMapper mapper ) { + public JsonNode toJson( ObjectMapper mapper ) { return BooleanNode.valueOf( value ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/EntityValue.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/EntityValue.java index 1ea117d49b..231d537071 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/EntityValue.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/EntityValue.java @@ -17,7 +17,7 @@ package org.polypheny.db.workflow.dag.settings; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Objects; import lombok.Value; import org.polypheny.db.catalog.Catalog; @@ -53,7 +53,7 @@ public static EntityValue of( JsonNode node ) { @Override - public JsonNode toJson( JsonMapper mapper ) { + public JsonNode toJson( ObjectMapper mapper ) { return mapper.createObjectNode() .put( "namespace", namespace ) .put( "name", name ); diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/IntValue.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/IntValue.java index 85cd4bdea7..febb046e0d 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/IntValue.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/IntValue.java @@ -17,7 +17,7 @@ package org.polypheny.db.workflow.dag.settings; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.IntNode; import lombok.Value; import org.polypheny.db.workflow.dag.settings.SettingDef.SettingValue; @@ -44,7 +44,7 @@ public static IntValue of( JsonNode node ) { @Override - public JsonNode toJson( JsonMapper mapper ) { + public JsonNode toJson( ObjectMapper mapper ) { return IntNode.valueOf( value ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/ListValue.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/ListValue.java index 781275d5ab..194b2cbd09 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/ListValue.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/ListValue.java @@ -17,7 +17,7 @@ package org.polypheny.db.workflow.dag.settings; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import java.util.ArrayList; import java.util.List; @@ -75,7 +75,7 @@ public static ListValue of( T entry ) { @Override - public JsonNode toJson( JsonMapper mapper ) { + public JsonNode toJson( ObjectMapper mapper ) { ArrayNode node = mapper.createArrayNode(); for ( SettingValue value : values ) { node.add( value.toJson( mapper ) ); diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/SettingDef.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/SettingDef.java index 4625078af6..b8b469cade 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/SettingDef.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/SettingDef.java @@ -18,7 +18,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.ObjectMapper; import java.lang.annotation.Annotation; import java.util.ArrayList; import java.util.Arrays; @@ -159,7 +159,7 @@ public enum SettingType { public interface SettingValue extends Wrapper { - JsonNode toJson( JsonMapper mapper ); + JsonNode toJson( ObjectMapper mapper ); } @@ -186,7 +186,7 @@ public SettingValue get( String key ) { public Map getSerializableSettings() { - JsonMapper mapper = new JsonMapper(); + ObjectMapper mapper = new ObjectMapper(); Map settingValues = new HashMap<>(); for ( Entry entry : map.entrySet() ) { settingValues.put( entry.getKey(), entry.getValue().toJson( mapper ) ); diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/StringValue.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/StringValue.java index 438c5e4a2d..ae7aa0bbb0 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/StringValue.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/settings/StringValue.java @@ -17,7 +17,7 @@ package org.polypheny.db.workflow.dag.settings; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.TextNode; import java.util.Objects; import lombok.NonNull; @@ -46,7 +46,7 @@ public static StringValue of( JsonNode node ) { @Override - public JsonNode toJson( JsonMapper mapper ) { + public JsonNode toJson( ObjectMapper mapper ) { return TextNode.valueOf( value ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/VariableStore.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/VariableStore.java index f6e46b1166..d993a4673c 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/VariableStore.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/dag/variables/VariableStore.java @@ -17,7 +17,7 @@ package org.polypheny.db.workflow.dag.variables; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.Collections; @@ -42,7 +42,7 @@ public class VariableStore implements ReadableVariableStore, WritableVariableSto ERROR_MSG_KEY ) ); - private static final JsonMapper mapper = new JsonMapper(); + private static final ObjectMapper mapper = new ObjectMapper(); private final Map variables = new HashMap<>(); diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/DefaultExecutor.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/DefaultExecutor.java index 55f6c866bc..a7f4399168 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/DefaultExecutor.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/DefaultExecutor.java @@ -56,7 +56,7 @@ void execute() throws ExecutorException { @Override - ExecutorType getType() { + public ExecutorType getType() { return ExecutorType.DEFAULT; } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/Executor.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/Executor.java index 6d05b842ff..5c940a12a8 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/Executor.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/Executor.java @@ -54,7 +54,7 @@ protected Executor( StorageManager sm, Workflow workflow ) { abstract void execute() throws ExecutorException; - abstract ExecutorType getType(); + public abstract ExecutorType getType(); /** diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/FusionExecutor.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/FusionExecutor.java index 91e36b3829..bd0282d206 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/FusionExecutor.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/FusionExecutor.java @@ -17,20 +17,32 @@ package org.polypheny.db.workflow.engine.execution; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.UUID; import org.apache.commons.lang3.NotImplementedException; import org.polypheny.db.algebra.AlgNode; +import org.polypheny.db.algebra.AlgRoot; +import org.polypheny.db.algebra.constant.Kind; +import org.polypheny.db.catalog.logistic.DataModel; import org.polypheny.db.plan.AlgCluster; +import org.polypheny.db.processing.ImplementationContext.ExecutedContext; +import org.polypheny.db.rex.RexBuilder; +import org.polypheny.db.transaction.Statement; +import org.polypheny.db.transaction.Transaction; +import org.polypheny.db.type.entity.PolyValue; import org.polypheny.db.util.graph.AttributedDirectedGraph; import org.polypheny.db.workflow.dag.Workflow; +import org.polypheny.db.workflow.dag.activities.Activity.PortType; import org.polypheny.db.workflow.dag.activities.ActivityWrapper; import org.polypheny.db.workflow.dag.activities.Fusable; import org.polypheny.db.workflow.dag.settings.SettingDef.Settings; import org.polypheny.db.workflow.engine.scheduler.ExecutionEdge; +import org.polypheny.db.workflow.engine.storage.QueryUtils; import org.polypheny.db.workflow.engine.storage.StorageManager; import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader; +import org.polypheny.db.workflow.engine.storage.writer.CheckpointWriter; /** * Executes a subgraph representing a set of fused activities, meaning they implement {@link org.polypheny.db.workflow.dag.activities.Fusable} @@ -55,26 +67,52 @@ public FusionExecutor( StorageManager sm, Workflow workflow, AttributedDirectedG @Override void execute() throws ExecutorException { System.out.println( "Start execution fused tree: " + execTree ); - + ActivityWrapper rootWrapper = workflow.getActivity( rootId ); + Transaction transaction = sm.getTransaction( rootId, rootWrapper.getConfig().getCommonType() ); + Statement statement = transaction.createStatement(); + AlgCluster cluster = AlgCluster.create( + statement.getQueryProcessor().getPlanner(), + new RexBuilder( statement.getTransaction().getTypeFactory() ), + null, + statement.getDataContext().getSnapshot() ); + + AlgRoot root; try { - // TODO: implement after PolyAlgebra is merged - //AlgNode node = constructAlgNode( rootId, cluster ); - - // 0. (verify node does not perform data manipulation) - // 1. exec node with TranslatedQueryContext - // 2. get result iterator - // 3. write result to checkpoint + root = AlgRoot.of( constructAlgNode( rootId, cluster ), Kind.SELECT ); } catch ( Exception e ) { - // TODO: handle exception + throw new ExecutorException( e ); + } + DataModel model = root.getModel().dataModel(); + PortType definedType = rootWrapper.getDef().getOutPortTypes()[0]; + if ( !PortType.fromDataModel( model ).canWriteTo( definedType ) ) { + throw new ExecutorException( "The data model of the fused AlgNode tree (" + model + ") is incompatible with the defined outPort type (" + definedType + ") of the root activity: " + execTree ); } - throw new NotImplementedException(); + if ( !QueryUtils.validateAlg( root, false, null ) ) { + throw new ExecutorException( "The fused AlgNode tree may not perform data manipulation: " + execTree ); + } + + ExecutedContext executedContext = QueryUtils.executeAlgRoot( root, statement ); + if ( executedContext.getException().isPresent() ) { + throw new ExecutorException( "An error occurred while executing the fused activities: " + execTree ); + } + + Iterator iterator = executedContext.getIterator().getIterator(); + try ( CheckpointWriter writer = sm.createCheckpoint( rootId, 0, root.validatedRowType, true, rootWrapper.getConfig().getPreferredStore( 0 ), model ) ) { + while ( iterator.hasNext() ) { + writer.write( Arrays.asList( iterator.next() ) ); + } + } catch ( Exception e ) { + throw new ExecutorException( e ); + } finally { + executedContext.getIterator().close(); + } } @Override - ExecutorType getType() { + public ExecutorType getType() { return ExecutorType.FUSION; } @@ -97,8 +135,9 @@ private AlgNode constructAlgNode( UUID root, AlgCluster cluster ) throws Excepti for ( int i = 0; i < inputsArr.length; i++ ) { if ( inputsArr[i] == null ) { // add remaining inputs for existing checkpoints - CheckpointReader reader = getReader( wrapper, i ); - inputsArr[i] = reader == null ? null : reader.getAlgNode( cluster ); + try ( CheckpointReader reader = getReader( wrapper, i ) ) { + inputsArr[i] = reader == null ? null : reader.getAlgNode( cluster ); + } } } List inputs = Arrays.asList( inputsArr ); @@ -111,6 +150,7 @@ private AlgNode constructAlgNode( UUID root, AlgCluster cluster ) throws Excepti Fusable activity = (Fusable) wrapper.getActivity(); AlgNode fused = activity.fuse( inputs, settings, cluster ); + System.out.println( "fused type of " + wrapper.getType() + " is: " + fused.getTupleType() ); wrapper.setOutTypePreview( List.of( Optional.of( fused.getTupleType() ) ) ); return fused; } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/PipeExecutor.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/PipeExecutor.java index 0b8b4492c7..e99119e917 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/PipeExecutor.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/PipeExecutor.java @@ -134,7 +134,7 @@ void execute() throws ExecutorException { @Override - ExecutorType getType() { + public ExecutorType getType() { return ExecutorType.PIPE; } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/VariableWriterExecutor.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/VariableWriterExecutor.java index 3ddb243af0..58487a131f 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/VariableWriterExecutor.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/VariableWriterExecutor.java @@ -53,7 +53,7 @@ void execute() throws ExecutorException { @Override - ExecutorType getType() { + public ExecutorType getType() { return ExecutorType.VARIABLE_WRITER; } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/context/ExecutionContext.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/context/ExecutionContext.java index 8043f53334..0b856d567e 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/context/ExecutionContext.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/context/ExecutionContext.java @@ -16,14 +16,9 @@ package org.polypheny.db.workflow.engine.execution.context; -import java.util.Iterator; -import java.util.List; import org.polypheny.db.algebra.type.AlgDataType; import org.polypheny.db.transaction.Transaction; -import org.polypheny.db.type.entity.PolyValue; import org.polypheny.db.workflow.dag.activities.Activity; -import org.polypheny.db.workflow.engine.storage.reader.CheckpointQuery; -import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader; import org.polypheny.db.workflow.engine.storage.writer.CheckpointWriter; import org.polypheny.db.workflow.engine.storage.writer.DocWriter; import org.polypheny.db.workflow.engine.storage.writer.LpgWriter; @@ -61,8 +56,6 @@ public interface ExecutionContext { */ CheckpointWriter createWriter( int idx, AlgDataType tupleType, boolean resetPk ); - Iterator> getIteratorFromQuery( CheckpointQuery query, List readers ); - /** * Returns a transaction to be used for extracting or loading data from data stores or data sources. * The transaction MUST NOT be committed or rolled back, as this is done externally. diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/context/ExecutionContextImpl.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/context/ExecutionContextImpl.java index f071ed44e7..9a127c2530 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/context/ExecutionContextImpl.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/execution/context/ExecutionContextImpl.java @@ -16,20 +16,14 @@ package org.polypheny.db.workflow.engine.execution.context; -import java.util.Iterator; -import java.util.List; import java.util.Objects; import lombok.Getter; -import org.apache.commons.lang3.NotImplementedException; import org.polypheny.db.algebra.type.AlgDataType; import org.polypheny.db.transaction.Transaction; -import org.polypheny.db.type.entity.PolyValue; import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory; import org.polypheny.db.workflow.dag.activities.Activity.PortType; import org.polypheny.db.workflow.dag.activities.ActivityWrapper; import org.polypheny.db.workflow.engine.storage.StorageManager; -import org.polypheny.db.workflow.engine.storage.reader.CheckpointQuery; -import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader; import org.polypheny.db.workflow.engine.storage.writer.CheckpointWriter; import org.polypheny.db.workflow.engine.storage.writer.DocWriter; import org.polypheny.db.workflow.engine.storage.writer.LpgWriter; @@ -114,15 +108,6 @@ public CheckpointWriter createWriter( int idx, AlgDataType tupleType, boolean re } - @Override - public Iterator> getIteratorFromQuery( CheckpointQuery query, List readers ) { - // just like reader.getIteratorFromQuery(), but with the ability to use multiple checkpoints - // requires a special CheckpointQuery that can specify placeholders for any one of the reader, given its index. - // Idea for closing the iterator correctly: register it with one of the supplied readers. - throw new NotImplementedException(); - } - - @Override public Transaction getTransaction() { if ( !activityWrapper.getDef().hasCategory( ActivityCategory.EXTRACT ) && !activityWrapper.getDef().hasCategory( ActivityCategory.LOAD ) ) { diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionEdge.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionEdge.java index 2aa6c809c8..2f1f6d27bc 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionEdge.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionEdge.java @@ -126,8 +126,8 @@ public boolean equals( Object obj ) { @Override public String toString() { return "ExecutionEdge{" + - "source=" + source + - ", target=" + target + + "source=" + getSource().toString().substring( 0, 4 ) + + ", target=" + getTarget().toString().substring( 0, 4 ) + ", isControl=" + isControl + ", fromPort=" + fromPort + ", toPort=" + toPort + diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionSubmission.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionSubmission.java index aa1d41df64..a8137cf855 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionSubmission.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/ExecutionSubmission.java @@ -25,10 +25,15 @@ @Value public class ExecutionSubmission { - CommonType commonType; Executor executor; Set activities; UUID rootId; + CommonType commonType; UUID sessionId; + + public String toString() { + return "ExecutionSubmission(executor=" + this.getExecutor().getType() + ", activities=" + this.getActivities() + ", rootId=" + this.getRootId() + ", commonType=" + this.getCommonType() + ", sessionId=" + this.getSessionId() + ")"; + } + } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java index ad187f7e9d..3b1337aae4 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/GlobalScheduler.java @@ -142,7 +142,7 @@ private void submit( List submissions ) { result = new ExecutionResult( submission ); } catch ( ExecutorException e ) { result = new ExecutionResult( submission, e ); - } catch ( Exception e ) { + } catch ( Throwable e ) { result = new ExecutionResult( submission, new ExecutorException( "Unexpected exception", e ) ); } activeSubmissions.get( sessionId ).remove( submission ); diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizer.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizer.java index 095b40dc91..fa30697d76 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizer.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizer.java @@ -174,7 +174,7 @@ public ExecutionSubmission create( StorageManager sm, Workflow wf ) { case VARIABLE_WRITER -> new VariableWriterExecutor( sm, wf, getRootActivity() ); }; - return new ExecutionSubmission( commonType, executor, activities, root, sm.getSessionId() ); + return new ExecutionSubmission( executor, activities, root, commonType, sm.getSessionId() ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizerImpl.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizerImpl.java index c64c7b0d21..569242265b 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizerImpl.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/scheduler/optimizer/WorkflowOptimizerImpl.java @@ -212,7 +212,7 @@ private List, NodeColor>> getFirstConnectedComponents( Attributed target = outEdge.getTarget(); assert nodeColors.get( target ) == color; - outEdges = subDag.getOutwardEdges( start ).stream().filter( e -> edgeColors.get( e ) == color.compatibleEdge ).toList(); + outEdges = subDag.getOutwardEdges( target ).stream().filter( e -> edgeColors.get( e ) == color.compatibleEdge ).toList(); assert outEdges.size() <= 1 : "Found connected component which is not an inverted tree"; outEdge = outEdges.isEmpty() ? null : outEdges.get( 0 ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/BatchWriter.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/BatchWriter.java new file mode 100644 index 0000000000..9d80877d14 --- /dev/null +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/BatchWriter.java @@ -0,0 +1,96 @@ +/* + * Copyright 2019-2024 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.workflow.engine.storage; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.polypheny.db.algebra.AlgRoot; +import org.polypheny.db.algebra.type.AlgDataType; +import org.polypheny.db.catalog.exceptions.GenericRuntimeException; +import org.polypheny.db.processing.ImplementationContext.ExecutedContext; +import org.polypheny.db.processing.QueryContext; +import org.polypheny.db.processing.QueryContext.ParsedQueryContext; +import org.polypheny.db.transaction.Statement; +import org.polypheny.db.type.entity.PolyValue; +import org.polypheny.db.util.Pair; + +public class BatchWriter implements AutoCloseable { + + private static final long MAX_BYTES_PER_BATCH = 10 * 1024 * 1024L; // 10 MiB, upper limit to (estimated) size of batch in bytes + private static final int MAX_TUPLES_PER_BATCH = 10_000; // upper limit to tuples per batch + + + private final Map paramTypes; + private final List> paramValues = new ArrayList<>(); + private long batchSize = -1; + + private final Statement writeStatement; + private final Pair parsed; + + + public BatchWriter( QueryContext context, Statement statement, Map paramTypes ) { + this.writeStatement = statement; + this.parsed = QueryUtils.parseAndTranslateQuery( context, writeStatement ); + this.paramTypes = paramTypes; + } + + + public void write( Map valueMap ) { + if ( batchSize == -1 ) { + batchSize = QueryUtils.computeBatchSize( valueMap.values().toArray( new PolyValue[0] ), MAX_BYTES_PER_BATCH, MAX_TUPLES_PER_BATCH ); + } + paramValues.add( valueMap ); + + if ( paramValues.size() < batchSize ) { + return; + } + executeBatch(); + } + + + private void executeBatch() { + int batchSize = paramValues.size(); + + writeStatement.getDataContext().setParameterTypes( paramTypes ); + writeStatement.getDataContext().setParameterValues( paramValues ); + + // create new implementation for each batch + ExecutedContext executedContext = QueryUtils.executeQuery( parsed, writeStatement ); + + if ( executedContext.getException().isPresent() ) { + throw new GenericRuntimeException( "An error occurred while writing a batch: ", executedContext.getException().get() ); + } + List> results = executedContext.getIterator().getAllRowsAndClose(); + long changedCount = results.size() == 1 ? results.get( 0 ).get( 0 ).asLong().longValue() : 0; + if ( changedCount != batchSize ) { + throw new GenericRuntimeException( "Unable to write all values of the batch: " + changedCount + " of " + batchSize + " tuples were written" ); + } + + paramValues.clear(); + writeStatement.getDataContext().resetParameterValues(); + } + + + @Override + public void close() throws Exception { + if ( !paramValues.isEmpty() ) { + executeBatch(); + } + } + +} diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/QueryUtils.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/QueryUtils.java index d0429d687b..2dc27d5e51 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/QueryUtils.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/QueryUtils.java @@ -16,23 +16,18 @@ package org.polypheny.db.workflow.engine.storage; -import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; import org.polypheny.db.PolyImplementation; import org.polypheny.db.adapter.AdapterManager; import org.polypheny.db.algebra.AlgNode; import org.polypheny.db.algebra.AlgRoot; +import org.polypheny.db.algebra.constant.Kind; import org.polypheny.db.algebra.core.common.Modify; import org.polypheny.db.algebra.core.common.Scan; import org.polypheny.db.algebra.type.AlgDataType; -import org.polypheny.db.algebra.type.AlgDataTypeFactory; import org.polypheny.db.catalog.Catalog; import org.polypheny.db.catalog.entity.allocation.AllocationPlacement; import org.polypheny.db.catalog.entity.logical.LogicalCollection; @@ -100,6 +95,19 @@ public static ExecutedContext executeQuery( Pair pa } + public static ExecutedContext executeAlgRoot( AlgRoot root, Statement statement ) { + QueryContext context = QueryContext.builder() + .query( "" ) + .language( QueryLanguage.from( "SQL" ) ) // TODO: does this language matter? + .isAnalysed( false ) + .origin( StorageManager.ORIGIN ) + .transactionManager( statement.getTransaction().getTransactionManager() ) + .transactions( List.of( statement.getTransaction() ) ).build(); + ParsedQueryContext parsedContext = ParsedQueryContext.fromQuery( "", null, context ); + return executeQuery( Pair.of( parsedContext, root ), statement ); + } + + public static String quoteAndJoin( List colNames ) { return colNames.stream() .map( s -> "\"" + s + "\"" ) @@ -120,6 +128,10 @@ public static String quotedIdentifier( LogicalEntity entity ) { public static boolean validateAlg( AlgRoot root, boolean allowDml, List allowedEntities ) { + if ( !(root.kind.belongsTo( Kind.QUERY ) || (allowDml && root.kind.belongsTo( Kind.DML ))) ) { + return false; + } + Set allowedIds = allowedEntities == null ? null : allowedEntities.stream().map( e -> e.id ).collect( Collectors.toSet() ); return validateRecursive( root.alg, allowDml, allowedIds ); } @@ -207,7 +219,7 @@ public static Transaction startTransaction( long namespace, String originSuffix } - private static long computeBatchSize( PolyValue[] representativeTuple, long maxBytesPerBatch, int maxTuplesPerBatch ) { + static long computeBatchSize( PolyValue[] representativeTuple, long maxBytesPerBatch, int maxTuplesPerBatch ) { long maxFromBytes = maxBytesPerBatch / estimateByteSize( representativeTuple ); return Math.max( Math.min( maxFromBytes, maxTuplesPerBatch ), 1 ); } @@ -286,274 +298,4 @@ yield estimateByteSize( polyPath.getNodes() ) + }; } - - public static class BatchWriter implements AutoCloseable { - - private static final long MAX_BYTES_PER_BATCH = 10 * 1024 * 1024L; // 10 MiB, upper limit to (estimated) size of batch in bytes - private static final int MAX_TUPLES_PER_BATCH = 10_000; // upper limit to tuples per batch - - - private final Map paramTypes; - private final List> paramValues = new ArrayList<>(); - private long batchSize = -1; - - private final Statement writeStatement; - private final Pair parsed; - - - public BatchWriter( QueryContext context, Statement statement, Map paramTypes ) { - this.writeStatement = statement; - this.parsed = QueryUtils.parseAndTranslateQuery( context, writeStatement ); - this.paramTypes = paramTypes; - } - - - public void write( Map valueMap ) { - if ( batchSize == -1 ) { - batchSize = computeBatchSize( valueMap.values().toArray( new PolyValue[0] ), MAX_BYTES_PER_BATCH, MAX_TUPLES_PER_BATCH ); - } - paramValues.add( valueMap ); - - if ( paramValues.size() < batchSize ) { - return; - } - executeBatch(); - } - - - private void executeBatch() { - int batchSize = paramValues.size(); - - writeStatement.getDataContext().setParameterTypes( paramTypes ); - writeStatement.getDataContext().setParameterValues( paramValues ); - - // create new implementation for each batch - ExecutedContext executedContext = QueryUtils.executeQuery( parsed, writeStatement ); - - if ( executedContext.getException().isPresent() ) { - throw new GenericRuntimeException( "An error occurred while writing a batch: ", executedContext.getException().get() ); - } - List> results = executedContext.getIterator().getAllRowsAndClose(); - long changedCount = results.size() == 1 ? results.get( 0 ).get( 0 ).asLong().longValue() : 0; - if ( changedCount != batchSize ) { - throw new GenericRuntimeException( "Unable to write all values of the batch: " + changedCount + " of " + batchSize + " tuples were written" ); - } - - paramValues.clear(); - writeStatement.getDataContext().resetParameterValues(); - } - - - @Override - public void close() throws Exception { - if ( !paramValues.isEmpty() ) { - executeBatch(); - } - } - - } - - - public static class RelBatchReader implements Iterator, AutoCloseable { - - static final int MAX_TUPLES_PER_BATCH = 100_000; // upper limit to tuples per batch - - private final Transaction transaction; - private final Statement readStatement; - private final Pair parsed; - private final AlgDataTypeFactory typeFactory = AlgDataTypeFactory.DEFAULT; - private final LogicalTable table; - private final int sortColumnIndex; // TODO: accept multiple sort cols - private final boolean isUnique; - private final AlgDataType sortColumnType; - - private Iterator currentBatch; - private PolyValue lastRead = null; - private PolyValue[] firstRow; - private int currentRowCount = 0; // number of rows read in the current batch - - private boolean hasNext = true; - - - public RelBatchReader( LogicalTable table, Transaction transaction, String sortColumn, boolean isUnique ) { - this.table = table; - this.transaction = transaction; - this.readStatement = transaction.createStatement(); - this.isUnique = isUnique; - assert isUnique : "RelBatchReader currently expects a primary key column with no duplicates"; - - this.sortColumnIndex = table.getTupleType().getFieldNames().indexOf( sortColumn ); - assert sortColumnIndex != -1 : "Invalid sort column"; - - this.sortColumnType = table.getTupleType().getFields().get( sortColumnIndex ).getType(); - - firstRow = readFirstRow(); // TODO: set batch size according to byte size of first row - - String query = "SELECT " + quoteAndJoin( table.getColumnNames() ) + " FROM " + quotedIdentifier( table ) + - " WHERE " + sortColumn + " > ?" + - " ORDER BY " + sortColumn + - " LIMIT " + MAX_TUPLES_PER_BATCH; - - QueryContext context = QueryContext.builder() - .query( query ) - .language( QueryLanguage.from( "SQL" ) ) - .isAnalysed( false ) - .origin( StorageManager.ORIGIN ) - .namespaceId( table.getNamespaceId() ) - .transactionManager( transaction.getTransactionManager() ) - .transactions( List.of( transaction ) ).build(); - - this.parsed = QueryUtils.parseAndTranslateQuery( context, readStatement ); - } - - - @Override - public boolean hasNext() { - return hasNext; - } - - - @Override - public PolyValue[] next() { - assert hasNext; - - if ( lastRead == null ) { - lastRead = firstRow[sortColumnIndex]; - readBatch(); - return firstRow; - } - - PolyValue[] row = currentBatch.next(); - currentRowCount++; - - if ( !currentBatch.hasNext() ) { - if ( currentRowCount < MAX_TUPLES_PER_BATCH ) { - hasNext = false; - closeIterator(); - } else { - lastRead = row[sortColumnIndex]; - readBatch(); - } - } - - return row; - } - - - /** - * Either sets the currentBatch iterator to the next batch if there are still rows to read - * or sets hasNext to false and closes the iterator. - */ - private void readBatch() { - readStatement.getDataContext().addParameterValues( 0, sortColumnType, List.of( lastRead ) ); - - ExecutedContext executedContext = QueryUtils.executeQuery( parsed, readStatement ); - readStatement.getDataContext().resetParameterValues(); - - if ( executedContext.getException().isPresent() ) { - throw new GenericRuntimeException( "An error occurred while reading a batch: ", executedContext.getException().get() ); - } - - currentBatch = executedContext.getIterator().getIterator(); - if ( !currentBatch.hasNext() ) { - hasNext = false; - closeIterator(); - } - currentRowCount = 0; - } - - - private PolyValue[] readFirstRow() { - String query = "SELECT " + quoteAndJoin( table.getColumnNames() ) + " FROM " + quotedIdentifier( table ) + - " ORDER BY " + table.getColumnNames().get( sortColumnIndex ) + - " LIMIT 1"; - - QueryContext context = QueryContext.builder() - .query( query ) - .language( QueryLanguage.from( "SQL" ) ) - .isAnalysed( false ) - .origin( StorageManager.ORIGIN ) - .namespaceId( table.getNamespaceId() ) - .transactionManager( transaction.getTransactionManager() ) - .transactions( List.of( transaction ) ).build(); - - Statement statement = transaction.createStatement(); - - ExecutedContext executedContext = QueryUtils.executeQuery( QueryUtils.parseAndTranslateQuery( context, statement ), statement ); - if ( executedContext.getException().isPresent() ) { - throw new GenericRuntimeException( "An error occurred while reading the first row: ", executedContext.getException().get() ); - } - List> values = executedContext.getIterator().getAllRowsAndClose(); - if ( values.isEmpty() ) { - hasNext = false; - return null; - } - return values.get( 0 ).toArray( new PolyValue[0] ); - } - - - private void closeIterator() { - if ( currentBatch == null ) { - return; - } - try { - if ( currentBatch instanceof AutoCloseable ) { - ((AutoCloseable) currentBatch).close(); - } - } catch ( Exception ignored ) { - } - currentBatch = null; - } - - - @Override - public void close() throws Exception { - closeIterator(); - } - - } - - - public static class AsyncRelBatchReader extends RelBatchReader { - - private final BlockingQueue queue; - private final Thread t; - - - public AsyncRelBatchReader( LogicalTable table, Transaction transaction, String sortColumn, boolean isUnique ) { - super( table, transaction, sortColumn, isUnique ); - queue = new LinkedBlockingQueue<>( 2 * MAX_TUPLES_PER_BATCH ); - - t = new Thread( () -> { - while ( super.hasNext() ) { - PolyValue[] row = super.next(); - try { - queue.put( row ); - } catch ( InterruptedException e ) { - throw new RuntimeException( e ); - } - } - } ); - t.start(); - } - - - @Override - public boolean hasNext() { - return super.hasNext() || !queue.isEmpty(); - } - - - @Override - public PolyValue[] next() { - try { - return queue.take(); - } catch ( InterruptedException e ) { - throw new RuntimeException( e ); - } - } - - - } - } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/RelBatchReader.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/RelBatchReader.java new file mode 100644 index 0000000000..c4d85a6796 --- /dev/null +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/RelBatchReader.java @@ -0,0 +1,236 @@ +/* + * Copyright 2019-2024 The Polypheny Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.polypheny.db.workflow.engine.storage; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.polypheny.db.algebra.AlgRoot; +import org.polypheny.db.algebra.type.AlgDataType; +import org.polypheny.db.algebra.type.AlgDataTypeFactory; +import org.polypheny.db.catalog.entity.logical.LogicalTable; +import org.polypheny.db.catalog.exceptions.GenericRuntimeException; +import org.polypheny.db.languages.QueryLanguage; +import org.polypheny.db.processing.ImplementationContext.ExecutedContext; +import org.polypheny.db.processing.QueryContext; +import org.polypheny.db.processing.QueryContext.ParsedQueryContext; +import org.polypheny.db.transaction.Statement; +import org.polypheny.db.transaction.Transaction; +import org.polypheny.db.type.entity.PolyValue; +import org.polypheny.db.util.Pair; + +public class RelBatchReader implements Iterator, AutoCloseable { + + static final int MAX_TUPLES_PER_BATCH = 100_000; // upper limit to tuples per batch + + private final Transaction transaction; + private final Statement readStatement; + private final Pair parsed; + private final AlgDataTypeFactory typeFactory = AlgDataTypeFactory.DEFAULT; + private final LogicalTable table; + private final int sortColumnIndex; // TODO: accept multiple sort cols + private final boolean isUnique; + private final AlgDataType sortColumnType; + + private Iterator currentBatch; + private PolyValue lastRead = null; + private PolyValue[] firstRow; + private int currentRowCount = 0; // number of rows read in the current batch + + private boolean hasNext = true; + + + public RelBatchReader( LogicalTable table, Transaction transaction, String sortColumn, boolean isUnique ) { + this.table = table; + this.transaction = transaction; + this.readStatement = transaction.createStatement(); + this.isUnique = isUnique; + assert isUnique : "RelBatchReader currently expects a primary key column with no duplicates"; + + this.sortColumnIndex = table.getTupleType().getFieldNames().indexOf( sortColumn ); + assert sortColumnIndex != -1 : "Invalid sort column"; + + this.sortColumnType = table.getTupleType().getFields().get( sortColumnIndex ).getType(); + + firstRow = readFirstRow(); // TODO: set batch size according to byte size of first row + + String query = "SELECT " + QueryUtils.quoteAndJoin( table.getColumnNames() ) + " FROM " + QueryUtils.quotedIdentifier( table ) + + " WHERE " + sortColumn + " > ?" + + " ORDER BY " + sortColumn + + " LIMIT " + MAX_TUPLES_PER_BATCH; + + QueryContext context = QueryContext.builder() + .query( query ) + .language( QueryLanguage.from( "SQL" ) ) + .isAnalysed( false ) + .origin( StorageManager.ORIGIN ) + .namespaceId( table.getNamespaceId() ) + .transactionManager( transaction.getTransactionManager() ) + .transactions( List.of( transaction ) ).build(); + + this.parsed = QueryUtils.parseAndTranslateQuery( context, readStatement ); + } + + + @Override + public boolean hasNext() { + return hasNext; + } + + + @Override + public PolyValue[] next() { + assert hasNext; + + if ( lastRead == null ) { + lastRead = firstRow[sortColumnIndex]; + readBatch(); + return firstRow; + } + + PolyValue[] row = currentBatch.next(); + currentRowCount++; + + if ( !currentBatch.hasNext() ) { + if ( currentRowCount < MAX_TUPLES_PER_BATCH ) { + hasNext = false; + closeIterator(); + } else { + lastRead = row[sortColumnIndex]; + readBatch(); + } + } + + return row; + } + + + /** + * Either sets the currentBatch iterator to the next batch if there are still rows to read + * or sets hasNext to false and closes the iterator. + */ + private void readBatch() { + readStatement.getDataContext().addParameterValues( 0, sortColumnType, List.of( lastRead ) ); + + ExecutedContext executedContext = QueryUtils.executeQuery( parsed, readStatement ); + readStatement.getDataContext().resetParameterValues(); + + if ( executedContext.getException().isPresent() ) { + throw new GenericRuntimeException( "An error occurred while reading a batch: ", executedContext.getException().get() ); + } + + currentBatch = executedContext.getIterator().getIterator(); + if ( !currentBatch.hasNext() ) { + hasNext = false; + closeIterator(); + } + currentRowCount = 0; + } + + + private PolyValue[] readFirstRow() { + String query = "SELECT " + QueryUtils.quoteAndJoin( table.getColumnNames() ) + " FROM " + QueryUtils.quotedIdentifier( table ) + + " ORDER BY " + table.getColumnNames().get( sortColumnIndex ) + + " LIMIT 1"; + + QueryContext context = QueryContext.builder() + .query( query ) + .language( QueryLanguage.from( "SQL" ) ) + .isAnalysed( false ) + .origin( StorageManager.ORIGIN ) + .namespaceId( table.getNamespaceId() ) + .transactionManager( transaction.getTransactionManager() ) + .transactions( List.of( transaction ) ).build(); + + Statement statement = transaction.createStatement(); + + ExecutedContext executedContext = QueryUtils.executeQuery( QueryUtils.parseAndTranslateQuery( context, statement ), statement ); + if ( executedContext.getException().isPresent() ) { + throw new GenericRuntimeException( "An error occurred while reading the first row: ", executedContext.getException().get() ); + } + List> values = executedContext.getIterator().getAllRowsAndClose(); + if ( values.isEmpty() ) { + hasNext = false; + return null; + } + return values.get( 0 ).toArray( new PolyValue[0] ); + } + + + private void closeIterator() { + if ( currentBatch == null ) { + return; + } + try { + if ( currentBatch instanceof AutoCloseable ) { + ((AutoCloseable) currentBatch).close(); + } + } catch ( Exception ignored ) { + } + currentBatch = null; + } + + + @Override + public void close() throws Exception { + closeIterator(); + } + + + public static class AsyncRelBatchReader extends RelBatchReader { + + private final BlockingQueue queue; + private final Thread t; + + + public AsyncRelBatchReader( LogicalTable table, Transaction transaction, String sortColumn, boolean isUnique ) { + super( table, transaction, sortColumn, isUnique ); + queue = new LinkedBlockingQueue<>( 2 * MAX_TUPLES_PER_BATCH ); + + t = new Thread( () -> { + while ( super.hasNext() ) { + PolyValue[] row = super.next(); + try { + queue.put( row ); + } catch ( InterruptedException e ) { + throw new RuntimeException( e ); + } + } + } ); + t.start(); + } + + + @Override + public boolean hasNext() { + return super.hasNext() || !queue.isEmpty(); + } + + + @Override + public PolyValue[] next() { + try { + return queue.take(); + } catch ( InterruptedException e ) { + throw new RuntimeException( e ); + } + } + + } + +} diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManagerImpl.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManagerImpl.java index 03cfc21928..9d3ad662ba 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManagerImpl.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/StorageManagerImpl.java @@ -287,6 +287,7 @@ public void rollbackTransaction( UUID activityId ) { @Override public void startCommonTransactions() { + // TODO: call this method at the correct time extractTransaction = QueryUtils.startTransaction( Catalog.defaultNamespaceId ); loadTransaction = QueryUtils.startTransaction( Catalog.defaultNamespaceId ); } @@ -343,7 +344,7 @@ private void dropEntity( LogicalEntity entity ) { private void dropNamespaces() { Transaction transaction = QueryUtils.startTransaction( relNamespace, "DropNamespaces" ); for ( String ns : registeredNamespaces ) { - ddlManager.dropNamespace( ns, false, transaction.createStatement() ); + ddlManager.dropNamespace( ns, true, transaction.createStatement() ); } transaction.commit(); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/reader/CheckpointReader.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/reader/CheckpointReader.java index 0cce3ceada..3ce46a5736 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/reader/CheckpointReader.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/reader/CheckpointReader.java @@ -93,9 +93,9 @@ public final Iterator> getIterator( boolean fixedSize ) { * For user defined input, it is advised to use dynamic parameters in the CheckpointQuery, to avoid SQL injections. * * @param query The CheckpointQuery to be executed. - * @return An iterator of the query result. + * @return the result tuple type and an iterator of the query result */ - public Iterator> getIteratorFromQuery( CheckpointQuery query ) { + public Pair>> getIteratorFromQuery( CheckpointQuery query ) { return getIteratorFromQuery( query, List.of( this ) ); } @@ -156,9 +156,9 @@ public List next() { * * @param query The CheckpointQuery to be executed. * @param inputs The readers whose checkpoints can be used in the query. The index of a reader in this list corresponds to the placeholder index in the CheckpointQuery. - * @return An iterator of the query result. + * @return the result tuple type and an iterator of the query result */ - public Iterator> getIteratorFromQuery( CheckpointQuery query, List inputs ) { + public Pair>> getIteratorFromQuery( CheckpointQuery query, List inputs ) { assert inputs.contains( this ); List entities = inputs.stream().map( reader -> reader.entity ).toList(); @@ -191,7 +191,7 @@ public Iterator> getIteratorFromQuery( CheckpointQuery query, Li Iterator iterator = executedContext.getIterator().getIterator(); registerIterator( iterator ); - return arrayToListIterator( iterator, false ); + return Pair.of( executedContext.getIterator().getRowType(), arrayToListIterator( iterator, false ) ); } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/reader/DocReader.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/reader/DocReader.java index 8fda779231..0649f25d43 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/reader/DocReader.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/reader/DocReader.java @@ -17,7 +17,6 @@ package org.polypheny.db.workflow.engine.storage.reader; import java.util.Iterator; -import java.util.List; import org.apache.commons.lang3.NotImplementedException; import org.polypheny.db.algebra.AlgNode; import org.polypheny.db.catalog.entity.logical.LogicalCollection; @@ -44,12 +43,6 @@ public Iterator getArrayIterator() { } - @Override - public Iterator> getIteratorFromQuery( CheckpointQuery query ) { - throw new NotImplementedException(); - } - - private LogicalCollection getCollection() { return (LogicalCollection) entity; } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/reader/LpgReader.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/reader/LpgReader.java index e5e6d5cc1b..1c81e64d50 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/reader/LpgReader.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/reader/LpgReader.java @@ -17,7 +17,6 @@ package org.polypheny.db.workflow.engine.storage.reader; import java.util.Iterator; -import java.util.List; import org.apache.commons.lang3.NotImplementedException; import org.polypheny.db.algebra.AlgNode; import org.polypheny.db.catalog.entity.logical.LogicalGraph; @@ -44,12 +43,6 @@ public Iterator getArrayIterator() { } - @Override - public Iterator> getIteratorFromQuery( CheckpointQuery query ) { - throw new NotImplementedException(); - } - - private LogicalGraph getGraph() { return (LogicalGraph) entity; } diff --git a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/writer/RelWriter.java b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/writer/RelWriter.java index 4e57098125..ec0c8b0cdb 100644 --- a/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/writer/RelWriter.java +++ b/plugins/workflow-engine/src/main/java/org/polypheny/db/workflow/engine/storage/writer/RelWriter.java @@ -27,7 +27,7 @@ import org.polypheny.db.transaction.Transaction; import org.polypheny.db.type.entity.PolyValue; import org.polypheny.db.type.entity.numerical.PolyLong; -import org.polypheny.db.workflow.engine.storage.QueryUtils.BatchWriter; +import org.polypheny.db.workflow.engine.storage.BatchWriter; import org.polypheny.db.workflow.engine.storage.StorageManager; public class RelWriter extends CheckpointWriter { diff --git a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/WorkflowUtils.java b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/WorkflowUtils.java index 9e2a017d3e..bd904d1a1c 100644 --- a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/WorkflowUtils.java +++ b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/WorkflowUtils.java @@ -24,10 +24,14 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.polypheny.db.catalog.logistic.DataModel; +import org.polypheny.db.util.Pair; import org.polypheny.db.util.graph.TopologicalOrderIterator; import org.polypheny.db.workflow.dag.Workflow; import org.polypheny.db.workflow.dag.WorkflowImpl; import org.polypheny.db.workflow.dag.activities.ActivityRegistry; +import org.polypheny.db.workflow.dag.settings.EntityValue; +import org.polypheny.db.workflow.engine.storage.StorageUtils; import org.polypheny.db.workflow.models.ActivityModel; import org.polypheny.db.workflow.models.EdgeModel; import org.polypheny.db.workflow.models.WorkflowConfigModel; @@ -38,8 +42,13 @@ public class WorkflowUtils { private static ObjectMapper mapper = new ObjectMapper(); - public static Workflow getWorkflow( List activities, List edges ) { - WorkflowConfigModel config = WorkflowConfigModel.of(); + public static Workflow getWorkflow( List activities, List edges, boolean fusionEnabled, boolean pipelineEnabled, int maxWorkers ) { + WorkflowConfigModel config = new WorkflowConfigModel( + Map.of( DataModel.RELATIONAL, "hsqldb", DataModel.DOCUMENT, "hsqldb", DataModel.GRAPH, "hsqldb" ), + fusionEnabled, + pipelineEnabled, + maxWorkers + ); return WorkflowImpl.fromModel( new WorkflowModel( activities, edges, config, null ) ); } @@ -52,7 +61,7 @@ public static Workflow getWorkflow1() { List edges = List.of( EdgeModel.of( activities.get( 0 ), activities.get( 1 ) ) ); - return getWorkflow( activities, edges ); + return getWorkflow( activities, edges, false, false, 1 ); } @@ -67,7 +76,7 @@ public static Workflow getUnionWorkflow() { EdgeModel.of( activities.get( 1 ), activities.get( 2 ), 1 ), EdgeModel.of( activities.get( 0 ), activities.get( 1 ), true ) // ensure consistent ordering ); - return getWorkflow( activities, edges ); + return getWorkflow( activities, edges, false, false, 1 ); } @@ -87,7 +96,69 @@ public static Workflow getMergeWorkflow( boolean simulateFailure ) { EdgeModel.of( activities.get( 2 ), activities.get( 3 ), 1 ), EdgeModel.of( activities.get( 0 ), activities.get( 1 ), true ) // ensure consistent ordering ); - return getWorkflow( activities, edges ); + return getWorkflow( activities, edges, false, false, 1 ); + } + + + public static Workflow getSimpleFusion() { + List activities = List.of( + new ActivityModel( "relValues" ), + new ActivityModel( "identity" ) + ); + List edges = List.of( + EdgeModel.of( activities.get( 0 ), activities.get( 1 ), 0 ) + ); + return getWorkflow( activities, edges, true, false, 1 ); + } + + + public static Pair> getAdvancedFusion() { + // 0 - 3 - 4 should fuse + List activities = List.of( + new ActivityModel( "relExtract", Map.of( "table", getEntitySetting( "public", StorageUtils.REL_TABLE ) ) ), + new ActivityModel( "relExtract", Map.of( "table", getEntitySetting( "public", StorageUtils.REL_TABLE ) ) ), + new ActivityModel( "debug" ), // -> cannot fuse + new ActivityModel( "relUnion" ), + new ActivityModel( "identity" ) + ); + List edges = List.of( + EdgeModel.of( activities.get( 0 ), activities.get( 3 ), 0 ), + EdgeModel.of( activities.get( 1 ), activities.get( 2 ), 0 ), + EdgeModel.of( activities.get( 2 ), activities.get( 3 ), 1 ), + EdgeModel.of( activities.get( 3 ), activities.get( 4 ), 0 ) + ); + return Pair.of( + getWorkflow( activities, edges, true, false, 1 ), + activities.stream().map( ActivityModel::getId ).toList() ); // also return ids, since topological order is not stable + } + + + public static Workflow getRelValuesFusion() { + // 0 - 3 should fuse + List activities = List.of( + new ActivityModel( "relValues" ), + new ActivityModel( "relValues" ), + new ActivityModel( "debug" ), // -> cannot fuse + new ActivityModel( "relUnion" ) + ); + List edges = List.of( + EdgeModel.of( activities.get( 0 ), activities.get( 3 ), 0 ), + EdgeModel.of( activities.get( 1 ), activities.get( 2 ), 0 ), + EdgeModel.of( activities.get( 2 ), activities.get( 3 ), 1 ) + ); + return getWorkflow( activities, edges, true, false, 1 ); + } + + + public static Workflow getExtractWorkflow() { + List activities = List.of( + new ActivityModel( "relExtract", Map.of( "table", getEntitySetting( "public", StorageUtils.REL_TABLE ) ) ), + new ActivityModel( "identity" ) + ); + List edges = List.of( + EdgeModel.of( activities.get( 0 ), activities.get( 1 ), 0 ) + ); + return getWorkflow( activities, edges, true, false, 1 ); } @@ -99,4 +170,9 @@ public static List getTopologicalActivityIds( Workflow workflow ) { return list; } + + public static JsonNode getEntitySetting( String namespace, String name ) { + return new EntityValue( namespace, name ).toJson( mapper ); + } + } diff --git a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/dag/activities/ActivityRegistryTest.java b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/dag/activities/ActivityRegistryTest.java index 57f22ab402..1581610587 100644 --- a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/dag/activities/ActivityRegistryTest.java +++ b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/dag/activities/ActivityRegistryTest.java @@ -23,7 +23,7 @@ import static org.junit.jupiter.api.Assertions.fail; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.IntNode; import com.fasterxml.jackson.databind.node.ObjectNode; import java.sql.SQLException; @@ -141,7 +141,7 @@ public void buildDefaultSettingValuesTest() throws InvalidSettingException { Map rebuiltSettings = ActivityRegistry.buildSettingValues( key, defaultSettings, true ).getMap(); assertEquals( defaultSettings.size(), rebuiltSettings.size() ); - JsonMapper mapper = new JsonMapper(); + ObjectMapper mapper = new ObjectMapper(); for ( Entry entry : rebuiltSettings.entrySet() ) { JsonNode rebuiltJson = entry.getValue().toJson( mapper ); assertEquals( defaultSettings.get( entry.getKey() ), rebuiltJson ); @@ -158,7 +158,7 @@ public void intVariableResolveTest() throws InvalidSettingException { String activity = "identity"; String settingKey = "I1"; - JsonMapper mapper = new JsonMapper(); + ObjectMapper mapper = new ObjectMapper(); VariableStore vStore = new VariableStore(); vStore.setVariable( varName, IntNode.valueOf( newValue ) ); diff --git a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/GlobalSchedulerTest.java b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/GlobalSchedulerTest.java index b8d4e9f6bd..d90d8760a4 100644 --- a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/GlobalSchedulerTest.java +++ b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/scheduler/GlobalSchedulerTest.java @@ -21,11 +21,14 @@ import java.sql.SQLException; import java.util.List; import java.util.UUID; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.polypheny.db.TestHelper; +import org.polypheny.db.util.Pair; import org.polypheny.db.workflow.WorkflowUtils; import org.polypheny.db.workflow.dag.Workflow; import org.polypheny.db.workflow.engine.storage.StorageManager; @@ -44,6 +47,7 @@ class GlobalSchedulerTest { public static void start() throws SQLException { testHelper = TestHelper.getInstance(); StorageUtils.addHsqldbLocksStore( "locks" ); + StorageUtils.addRelData(); scheduler = GlobalScheduler.getInstance(); } @@ -62,6 +66,13 @@ public void cleanup() { } catch ( Exception e ) { throw new RuntimeException( e ); } + testHelper.checkAllTrxClosed(); + } + + + @AfterAll + public static void tearDown() { + StorageUtils.dropData(); } @@ -73,7 +84,6 @@ void executeSimpleWorkflowTest() throws Exception { scheduler.awaitResultProcessor( 5000 ); System.out.println( StorageUtils.readCheckpoint( sm, ids.get( 1 ), 0 ) ); - testHelper.checkAllTrxClosed(); } @@ -87,7 +97,6 @@ void executeUnionWorkflowTest() throws Exception { System.out.println( StorageUtils.readCheckpoint( sm, ids.get( 0 ), 0 ) ); System.out.println( StorageUtils.readCheckpoint( sm, ids.get( 1 ), 0 ) ); System.out.println( StorageUtils.readCheckpoint( sm, ids.get( 2 ), 0 ) ); - testHelper.checkAllTrxClosed(); } @@ -103,7 +112,6 @@ void executeMergeWorkflowTest() throws Exception { System.out.println( StorageUtils.readCheckpoint( sm, ids.get( 1 ), 0 ) ); assertFalse( sm.hasCheckpoint( ids.get( 2 ), 0 ) ); System.out.println( StorageUtils.readCheckpoint( sm, ids.get( 3 ), 0 ) ); - testHelper.checkAllTrxClosed(); } @@ -119,4 +127,55 @@ void executeWorkflowInStepsTest() throws Exception { testHelper.checkAllTrxClosed(); } + + @Test + @Disabled + void simpleFusionTest() throws Exception { + Workflow workflow = WorkflowUtils.getSimpleFusion(); + List ids = WorkflowUtils.getTopologicalActivityIds( workflow ); + scheduler.startExecution( workflow, sm, ids.get( 1 ) ); + scheduler.awaitResultProcessor( 5000 ); + assertFalse( sm.hasCheckpoint( ids.get( 0 ), 0 ) ); + System.out.println( StorageUtils.readCheckpoint( sm, ids.get( 1 ), 0 ) ); + } + + + @Test + void advancedFusionTest() throws Exception { + Pair> pair = WorkflowUtils.getAdvancedFusion(); + List ids = pair.right; + scheduler.startExecution( pair.left, sm, ids.get( ids.size() - 1 ) ); + scheduler.awaitResultProcessor( 5000 ); + + assertFalse( sm.hasCheckpoint( ids.get( 0 ), 0 ) ); + System.out.println( StorageUtils.readCheckpoint( sm, ids.get( 1 ), 0 ) ); + System.out.println( StorageUtils.readCheckpoint( sm, ids.get( 2 ), 0 ) ); + { + assertFalse( sm.hasCheckpoint( ids.get( 3 ), 0 ) ); + } + System.out.println( StorageUtils.readCheckpoint( sm, ids.get( 4 ), 0 ) ); + } + + + @Test + @Disabled + void RelValuesFusionTest() throws Exception { + Workflow workflow = WorkflowUtils.getRelValuesFusion(); + List ids = WorkflowUtils.getTopologicalActivityIds( workflow ); + scheduler.startExecution( workflow, sm, ids.get( ids.size() - 1 ) ); + scheduler.awaitResultProcessor( 5000 ); + System.out.println( StorageUtils.readCheckpoint( sm, ids.get( ids.size() - 1 ), 0 ) ); + } + + + @Test + void relExtractTest() throws Exception { + Workflow workflow = WorkflowUtils.getExtractWorkflow(); + List ids = WorkflowUtils.getTopologicalActivityIds( workflow ); + scheduler.startExecution( workflow, sm, ids.get( ids.size() - 1 ) ); + scheduler.awaitResultProcessor( 5000 ); + assertFalse( sm.hasCheckpoint( ids.get( 0 ), 0 ) ); + System.out.println( StorageUtils.readCheckpoint( sm, ids.get( ids.size() - 1 ), 0 ) ); + } + } diff --git a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/storage/StorageManagerTest.java b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/storage/StorageManagerTest.java index 884f24a24f..cc4109168b 100644 --- a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/storage/StorageManagerTest.java +++ b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/storage/StorageManagerTest.java @@ -173,7 +173,7 @@ void readQueryResultFromRelCheckpointTest() throws Exception { try ( RelReader reader = (RelReader) sm.readCheckpoint( activityId, 0 ) ) { int i = sampleData.size() - 1; // reverse order because of DESC - Iterator> it = reader.getIteratorFromQuery( query ); + Iterator> it = reader.getIteratorFromQuery( query ).right; while ( it.hasNext() ) { assertTupleEquals( sampleData.get( i ), it.next() ); i--; diff --git a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/storage/StorageUtils.java b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/storage/StorageUtils.java index 2c5f9649fd..57cfedc2bc 100644 --- a/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/storage/StorageUtils.java +++ b/plugins/workflow-engine/src/test/java/org/polypheny/db/workflow/engine/storage/StorageUtils.java @@ -35,6 +35,8 @@ public class StorageUtils { public static final String HSQLDB_LOCKS = "hsqldb_locks"; public static final String HSQLDB_MVLOCKS = "hsqldb_mvlocks"; + public static final String REL_TABLE = "rel_data"; + public static void addHsqldbStore( String name, String trxControlMode ) throws SQLException { TestHelper.executeSQL( "ALTER ADAPTERS ADD \"%s\" USING 'Hsqldb' AS 'Store'".formatted( name ) @@ -80,4 +82,35 @@ public static List> readCheckpoint( StorageManager sm, UUID acti return list; } + + public static void addRelData() { + try ( JdbcConnection jdbcConnection = new JdbcConnection( false ) ) { + Connection connection = jdbcConnection.getConnection(); + try ( Statement statement = connection.createStatement() ) { + statement.executeUpdate( "CREATE TABLE rel_data( id INTEGER NOT NULL, name VARCHAR(39), foo INTEGER, PRIMARY KEY (id))" ); + statement.executeUpdate( "INSERT INTO rel_data VALUES (1, 'Hans', 5)" ); + statement.executeUpdate( "INSERT INTO rel_data VALUES (2, 'Alice', 7)" ); + statement.executeUpdate( "INSERT INTO rel_data VALUES (3, 'Bob', 4)" ); + statement.executeUpdate( "INSERT INTO rel_data VALUES (4, 'Saskia', 6)" ); + statement.executeUpdate( "INSERT INTO rel_data VALUES (5, 'Rebecca', 3)" ); + statement.executeUpdate( "INSERT INTO rel_data VALUES (6, 'Georg', 9)" ); + connection.commit(); + } + } catch ( SQLException e ) { + throw new RuntimeException( e ); + } + } + + + public static void dropData() { + try ( JdbcConnection jdbcConnection = new JdbcConnection( true ) ) { + Connection connection = jdbcConnection.getConnection(); + try ( Statement statement = connection.createStatement() ) { + statement.executeUpdate( "DROP TABLE rel_data" ); + } + } catch ( SQLException e ) { + throw new RuntimeException( e ); + } + } + }