Skip to content

Commit

Permalink
Get activity fusion working
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Dec 10, 2024
1 parent 0b1d8c4 commit 0d1ff95
Show file tree
Hide file tree
Showing 41 changed files with 748 additions and 397 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.javalin.http.ContentType;
import io.javalin.http.Context;
Expand Down Expand Up @@ -91,7 +90,7 @@ private void createExecuteDummyWorkflowTest() {
server.addSerializedRoute( PATH + "/executeDummy/{namespaceName}/{tableName}/{storeName}", ctx -> {
System.out.println( "handling dummy execution..." );

JsonMapper mapper = new JsonMapper();
ObjectMapper mapper = new ObjectMapper();
ObjectNode setting = mapper.createObjectNode();
setting.put( "namespace", ctx.pathParam( "namespaceName" ) );
setting.put( "name", ctx.pathParam( "tableName" ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@ public class WorkflowImpl implements Workflow {


public WorkflowImpl() {
this.activities = new ConcurrentHashMap<>();
this.edges = new ConcurrentHashMap<>();
this.config = WorkflowConfigModel.of();
this( new ConcurrentHashMap<>(), new ConcurrentHashMap<>(), WorkflowConfigModel.of() );
}


private WorkflowImpl( Map<UUID, ActivityWrapper> activities, Map<Pair<UUID, UUID>, List<Edge>> edges, WorkflowConfigModel config ) {
this.activities = activities;
this.edges = edges;
this.config = config;

// TODO: compute previews & variables
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ public DataModel getDataModel() {
case LPG -> DataModel.GRAPH;
};
}


public static PortType fromDataModel( DataModel model ) {
return switch ( model ) {
case RELATIONAL -> REL;
case DOCUMENT -> DOC;
case GRAPH -> LPG;
};
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@

import java.util.List;
import java.util.Optional;
import org.polypheny.db.algebra.AlgNode;
import org.polypheny.db.algebra.logical.relational.LogicalRelProject;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.plan.AlgCluster;
import org.polypheny.db.workflow.dag.activities.Activity;
import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory;
import org.polypheny.db.workflow.dag.activities.Activity.PortType;
import org.polypheny.db.workflow.dag.activities.ActivityException;
import org.polypheny.db.workflow.dag.activities.Fusable;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.InPort;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.OutPort;
Expand Down Expand Up @@ -50,7 +54,7 @@
)
@IntSetting(key = "I2", displayName = "THIRD", defaultValue = 0, isList = true, group = "groupA")
@StringSetting(key = "S2", displayName = "FOURTH", defaultValue = "test", isList = true, group = "groupA", subGroup = "a")
public class IdentityActivity implements Activity {
public class IdentityActivity implements Activity, Fusable {


public IdentityActivity() {
Expand All @@ -72,6 +76,13 @@ public void execute( List<CheckpointReader> inputs, Settings settings, Execution
}


@Override
public AlgNode fuse( List<AlgNode> inputs, Settings settings, AlgCluster cluster ) throws Exception {
// to make it more interesting, we add a project activity that doesn't change the tupleType
return LogicalRelProject.identity( inputs.get( 0 ) );
}


@Override
public void reset() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@

import static org.polypheny.db.workflow.dag.activities.impl.RelExtractActivity.TABLE_KEY;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;
import org.apache.commons.lang3.NotImplementedException;
import org.polypheny.db.ResultIterator;
import org.polypheny.db.algebra.AlgNode;
import org.polypheny.db.algebra.logical.relational.LogicalRelProject;
import org.polypheny.db.algebra.logical.relational.LogicalRelScan;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.algebra.type.AlgDataTypeFactory;
import org.polypheny.db.algebra.type.AlgDataTypeField;
Expand All @@ -32,8 +36,12 @@
import org.polypheny.db.languages.LanguageManager;
import org.polypheny.db.languages.QueryLanguage;
import org.polypheny.db.plan.AlgCluster;
import org.polypheny.db.plan.AlgTraitSet;
import org.polypheny.db.processing.ImplementationContext.ExecutedContext;
import org.polypheny.db.processing.QueryContext;
import org.polypheny.db.rex.RexIndexRef;
import org.polypheny.db.rex.RexNode;
import org.polypheny.db.schema.trait.ModelTrait;
import org.polypheny.db.transaction.Transaction;
import org.polypheny.db.type.PolyType;
import org.polypheny.db.workflow.dag.activities.Activity;
Expand Down Expand Up @@ -144,13 +152,18 @@ public void reset() {

@Override
public AlgNode fuse( List<AlgNode> inputs, Settings settings, AlgCluster cluster ) throws Exception {
throw new NotImplementedException();
/*LogicalTable table = getEntity( settings.get( TABLE_KEY ) );
AlgDataType type = getOutputType( table );
LogicalTable table = settings.get( TABLE_KEY, EntityValue.class ).getTable();
AlgTraitSet traits = AlgTraitSet.createEmpty().plus( ModelTrait.RELATIONAL );

AlgNode scan = new LogicalRelScan( cluster, traits, table );
return LogicalRelProject.create( scan, , );*/
List<RexNode> projects = new ArrayList<>();
projects.add( cluster.getRexBuilder().makeBigintLiteral( new BigDecimal( 0 ) ) ); // Add new PK col
IntStream.range( 0, table.getTupleType().getFieldCount() )
.mapToObj( i ->
new RexIndexRef( i, table.getTupleType().getFields().get( i ).getType() )
).forEach( projects::add );

return LogicalRelProject.create( scan, projects, getOutputType( table ) );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.polypheny.db.workflow.engine.execution.context.PipeExecutionContext;
import org.polypheny.db.workflow.engine.execution.pipe.InputPipe;
import org.polypheny.db.workflow.engine.execution.pipe.OutputPipe;
import org.polypheny.db.workflow.engine.storage.QueryUtils.BatchWriter;
import org.polypheny.db.workflow.engine.storage.BatchWriter;
import org.polypheny.db.workflow.engine.storage.StorageManager;
import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader;
import org.polypheny.db.workflow.engine.storage.writer.RelWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

package org.polypheny.db.workflow.dag.activities.impl;

import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.util.Pair;
import org.polypheny.db.workflow.dag.activities.Activity;
import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory;
import org.polypheny.db.workflow.dag.activities.Activity.PortType;
import org.polypheny.db.workflow.dag.activities.ActivityException;
import org.polypheny.db.workflow.dag.activities.ActivityException.InvalidInputException;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.InPort;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.OutPort;
Expand All @@ -50,10 +52,7 @@ public List<Optional<AlgDataType>> previewOutTypes( List<Optional<AlgDataType>>
return List.of( Optional.empty() );
}

if ( !firstType.get().equals( secondType.get() ) ) {
throw new InvalidInputException( "The second input type is not equal to the first input type", 1 );
}
return List.of( firstType );
return Activity.wrapType( RelUnionActivity.getTypeOrThrow( List.of( firstType.get(), secondType.get() ) ) );
}


Expand All @@ -73,8 +72,9 @@ public void execute( List<CheckpointReader> inputs, Settings settings, Execution
.queryLanguage( "SQL" )
.query( "SELECT * FROM " + CheckpointQuery.ENTITY( 0 ) + " UNION ALL SELECT * FROM " + CheckpointQuery.ENTITY( 1 ) )
.build();
try ( CheckpointWriter writer = ctx.createRelWriter( 0, input0.getTupleType(), true ) ) {
writer.write( input0.getIteratorFromQuery( query, inputs ) );
Pair<AlgDataType, Iterator<List<PolyValue>>> result = input0.getIteratorFromQuery( query, inputs );
try ( CheckpointWriter writer = ctx.createRelWriter( 0, result.left, true ) ) {
writer.write( result.right );
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,22 @@

package org.polypheny.db.workflow.dag.activities.impl;

import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.polypheny.db.algebra.AlgNode;
import org.polypheny.db.algebra.logical.relational.LogicalRelUnion;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.algebra.type.AlgDataTypeFactory;
import org.polypheny.db.plan.AlgCluster;
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.util.Pair;
import org.polypheny.db.workflow.dag.activities.Activity;
import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory;
import org.polypheny.db.workflow.dag.activities.Activity.PortType;
import org.polypheny.db.workflow.dag.activities.ActivityException;
import org.polypheny.db.workflow.dag.activities.ActivityException.InvalidInputException;
import org.polypheny.db.workflow.dag.activities.Fusable;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.InPort;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.OutPort;
Expand All @@ -38,7 +46,7 @@
inPorts = { @InPort(type = PortType.REL), @InPort(type = PortType.REL) },
outPorts = { @OutPort(type = PortType.REL) }
)
public class RelUnionActivity implements Activity {
public class RelUnionActivity implements Activity, Fusable {

@Override
public List<Optional<AlgDataType>> previewOutTypes( List<Optional<AlgDataType>> inTypes, SettingsPreview settings ) throws ActivityException {
Expand All @@ -48,10 +56,7 @@ public List<Optional<AlgDataType>> previewOutTypes( List<Optional<AlgDataType>>
if ( firstType.isEmpty() || secondType.isEmpty() ) {
return List.of( Optional.empty() );
}
if ( !firstType.get().equals( secondType.get() ) ) {
throw new InvalidInputException( "The second input type is not equal to the first input type", 1 );
}
return List.of( firstType );
return Activity.wrapType( getTypeOrThrow( List.of( firstType.get(), secondType.get() ) ) );
}


Expand All @@ -61,16 +66,34 @@ public void execute( List<CheckpointReader> inputs, Settings settings, Execution
.queryLanguage( "SQL" )
.query( "SELECT * FROM " + CheckpointQuery.ENTITY( 0 ) + " UNION ALL SELECT * FROM " + CheckpointQuery.ENTITY( 1 ) )
.build();
try ( CheckpointWriter writer = ctx.createRelWriter( 0, inputs.get( 0 ).getTupleType(), true ) ) {
writer.write( inputs.get( 0 ).getIteratorFromQuery( query, inputs ) );
Pair<AlgDataType, Iterator<List<PolyValue>>> result = inputs.get( 0 ).getIteratorFromQuery( query, inputs );
try ( CheckpointWriter writer = ctx.createRelWriter( 0, result.left, true ) ) {
writer.write( result.right );
}

}


@Override
public AlgNode fuse( List<AlgNode> inputs, Settings settings, AlgCluster cluster ) throws Exception {
System.out.println( "in types: " + inputs.stream().map( AlgNode::getTupleType ).toList() );
return LogicalRelUnion.create( inputs, true );
}


@Override
public void reset() {

}


public static AlgDataType getTypeOrThrow( List<AlgDataType> inputs ) throws InvalidInputException {
AlgDataType type = AlgDataTypeFactory.DEFAULT.leastRestrictive( inputs );

if ( type == null ) {
throw new InvalidInputException( "The tuple types of the inputs are incompatible", 1 );
}
return type;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,29 @@

package org.polypheny.db.workflow.dag.activities.impl;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import org.polypheny.db.algebra.AlgNode;
import org.polypheny.db.algebra.logical.relational.LogicalRelValues;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.algebra.type.AlgDataTypeFactory;
import org.polypheny.db.algebra.type.AlgDataTypeField;
import org.polypheny.db.plan.AlgCluster;
import org.polypheny.db.rex.RexBuilder;
import org.polypheny.db.rex.RexLiteral;
import org.polypheny.db.type.PolyType;
import org.polypheny.db.type.entity.PolyString;
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.type.entity.numerical.PolyInteger;
import org.polypheny.db.type.entity.numerical.PolyLong;
import org.polypheny.db.workflow.dag.activities.Activity;
import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory;
import org.polypheny.db.workflow.dag.activities.Activity.PortType;
import org.polypheny.db.workflow.dag.activities.ActivityException;
import org.polypheny.db.workflow.dag.activities.Fusable;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.OutPort;
import org.polypheny.db.workflow.dag.annotations.BoolSetting;
Expand All @@ -49,7 +58,7 @@
)
@IntSetting(key = "rowCount", displayName = "Row Count", defaultValue = 3, min = 1, max = 1_000_000)
@BoolSetting(key = "fixSeed", displayName = "Fix Random Seed", defaultValue = false)
public class RelValuesActivity implements Activity {
public class RelValuesActivity implements Activity, Fusable {

private static final List<String> NAMES = List.of( "Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Hank" );
private static final List<String> LAST_NAMES = List.of( "Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis" );
Expand All @@ -72,6 +81,16 @@ public void execute( List<CheckpointReader> inputs, Settings settings, Execution
}


@Override
public AlgNode fuse( List<AlgNode> inputs, Settings settings, AlgCluster cluster ) throws Exception {
List<List<PolyValue>> values = getValues(
settings.get( "rowCount", IntValue.class ).getValue(),
settings.get( "fixSeed", BoolValue.class ).getValue()
);
return LogicalRelValues.create( cluster, getType(), toRexLiterals( getType(), values ) );
}


@Override
public void reset() {

Expand Down Expand Up @@ -104,8 +123,33 @@ private static List<List<PolyValue>> getValues( int n, boolean fixSeed ) {
}


private static ImmutableList<ImmutableList<RexLiteral>> toRexLiterals( AlgDataType tupleType, List<List<PolyValue>> rows ) {
RexBuilder builder = new RexBuilder( AlgDataTypeFactory.DEFAULT );
List<ImmutableList<RexLiteral>> records = new ArrayList<>();
List<AlgDataType> fieldTypes = tupleType.getFields().stream().map( AlgDataTypeField::getType ).toList();
for ( final List<PolyValue> row : rows ) {
final List<RexLiteral> record = new ArrayList<>();
for ( int i = 0; i < row.size(); ++i ) {
// TODO: fix creation of RexLiteral
PolyValue value = row.get( i );
AlgDataType type = fieldTypes.get( i );
record.add( new RexLiteral( value, type, type.getPolyType() ) );

//record.add( builder.makeLiteral( value, type, type.getPolyType() ) );
}
records.add( ImmutableList.copyOf( record ) );
}
return ImmutableList.copyOf( records );
}


private static List<PolyValue> getRow( String name, String lastName, int age, int salary ) {
return List.of( PolyInteger.of( 0 ), PolyString.of( name ), PolyString.of( lastName ), PolyInteger.of( age ), PolyInteger.of( salary ) );
return List.of(
PolyLong.of( 0 ),
PolyString.of( name ),
PolyString.of( lastName ),
PolyInteger.of( age ),
PolyInteger.of( salary ) );
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.polypheny.db.workflow.dag.settings;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.BooleanNode;
import lombok.AccessLevel;
import lombok.Getter;
Expand Down Expand Up @@ -52,7 +52,7 @@ public static BoolValue of( JsonNode node ) {


@Override
public JsonNode toJson( JsonMapper mapper ) {
public JsonNode toJson( ObjectMapper mapper ) {
return BooleanNode.valueOf( value );
}

Expand Down
Loading

0 comments on commit 0d1ff95

Please sign in to comment.