Skip to content

Commit

Permalink
Add tests for doc and lpg checkpoints and refactor activities
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Dec 17, 2024
1 parent d862cde commit 1b89e7e
Show file tree
Hide file tree
Showing 39 changed files with 1,203 additions and 140 deletions.
2 changes: 2 additions & 0 deletions plugins/workflow-engine/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ dependencies {
testImplementation(group: "org.polypheny", name: "polypheny-jdbc-driver", version: polypheny_jdbc_driver_version) {
exclude(group: "com.fasterxml.jackson.core")
}

testImplementation group: "com.konghq", name: "unirest-java", version: unirest_version
}

test.dependsOn(":dbms:shadowJar")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ assert canFuse(
}

Iterator<PolyValue[]> iterator = executedContext.getIterator().getIterator();
try ( CheckpointWriter writer = ctx.createWriter( 0, root.validatedRowType, true ) ) {
CheckpointWriter writer = ctx.createWriter( 0, root.validatedRowType, true );
try {
while ( iterator.hasNext() ) {
writer.write( Arrays.asList( iterator.next() ) );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ assert canPipe(
AlgDataType type = lockOutputType( inputTypes, settings );
List<InputPipe> inPipes = inputs.stream().map( reader -> (InputPipe) new CheckpointInputPipe( reader ) ).toList();
PipeExecutionContext pipeCtx = (ExecutionContextImpl) ctx;
try ( CheckpointWriter writer = ctx.createWriter( 0, type, true ) ) {

CheckpointWriter writer = ctx.createWriter( 0, type, true );

try {
OutputPipe outPipe = new CheckpointOutputPipe( type, writer );
pipe( inPipes, outPipe, settings, pipeCtx );
} catch ( PipeInterruptedException e ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
@BoolSetting(key = "canPipe", displayName = "Enable Pipelining", defaultValue = false)
@BoolSetting(key = "canFuse", displayName = "Enable Fusion", defaultValue = false)
@BoolSetting(key = "isSuccessful", displayName = "Successful Execution", defaultValue = true)

@SuppressWarnings("unused")
public class DebugActivity implements Activity, Pipeable, Fusable {

@Override
Expand All @@ -69,11 +71,11 @@ public List<Optional<AlgDataType>> previewOutTypes( List<Optional<AlgDataType>>
@Override
public void execute( List<CheckpointReader> inputs, Settings settings, ExecutionContext ctx ) throws Exception {
RelReader input = (RelReader) inputs.get( 0 );
try ( RelWriter output = ctx.createRelWriter( 0, input.getTupleType(), false ) ) {
Thread.sleep( settings.get( "delay", IntValue.class ).getValue() );
checkFail( settings );
output.write( input.getIterator() );
}
RelWriter output = ctx.createRelWriter( 0, input.getTupleType(), false );

Thread.sleep( settings.get( "delay", IntValue.class ).getValue() );
checkFail( settings );
output.write( input.getIterator() );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@
inPorts = {},
outPorts = { @OutPort(type = PortType.DOC) })

@EntitySetting(key = COLL_KEY, displayName = "Table", dataModel = DataModel.DOCUMENT, mustExist = true)
@EntitySetting(key = COLL_KEY, displayName = "Collection", dataModel = DataModel.DOCUMENT, mustExist = true)

@SuppressWarnings("unused")
public class DocExtractActivity implements Activity, Fusable, Pipeable {

static final String COLL_KEY = "collection";
public static final String COLL_KEY = "collection";

private LogicalCollection lockedEntity;

Expand All @@ -82,10 +83,8 @@ public List<Optional<AlgDataType>> previewOutTypes( List<Optional<AlgDataType>>
public void execute( List<CheckpointReader> inputs, Settings settings, ExecutionContext ctx ) throws Exception {
LogicalCollection collection = settings.get( COLL_KEY, EntityValue.class ).getCollection();
AlgDataType type = getOutputType( collection );

try ( DocWriter writer = ctx.createDocWriter( 0 );
ResultIterator result = getResultIterator( ctx.getTransaction(), collection ) ) { // transaction will get committed or rolled back externally

DocWriter writer = ctx.createDocWriter( 0 );
try ( ResultIterator result = getResultIterator( ctx.getTransaction(), collection ) ) { // transaction will get committed or rolled back externally
for ( Iterator<PolyValue[]> it = result.getIterator(); it.hasNext(); ) {
writer.write( it.next()[0].asDocument() );
}
Expand Down Expand Up @@ -134,7 +133,7 @@ private AlgDataType getOutputType( LogicalCollection collection ) {


private ResultIterator getResultIterator( Transaction transaction, LogicalCollection collection ) {
String query = "db.\"" + collection.getName() + "\".count({})";
String query = "db.\"" + collection.getName() + "\".find({})";

System.out.println( "Before exec" );
long start = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.NotImplementedException;
import org.polypheny.db.algebra.AlgNode;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.plan.AlgCluster;
Expand Down Expand Up @@ -46,6 +45,7 @@
inPorts = { @InPort(type = PortType.DOC) },
outPorts = { @OutPort(type = PortType.DOC) }
)
@SuppressWarnings("unused")
public class DocIdentityActivity implements Activity, Fusable, Pipeable {


Expand All @@ -62,9 +62,8 @@ public List<Optional<AlgDataType>> previewOutTypes( List<Optional<AlgDataType>>
@Override
public void execute( List<CheckpointReader> inputs, Settings settings, ExecutionContext ctx ) throws Exception {
DocReader input = (DocReader) inputs.get( 0 );
try ( DocWriter output = ctx.createDocWriter( 0 ) ) {
output.write( input.getIterator() );
}
DocWriter output = ctx.createDocWriter( 0 );
output.write( input.getIterator() );
}


Expand All @@ -84,10 +83,7 @@ public void pipe( List<InputPipe> inputs, OutputPipe output, Settings settings,

@Override
public AlgNode fuse( List<AlgNode> inputs, Settings settings, AlgCluster cluster ) throws Exception {
// to make it more interesting, we add a project activity that doesn't change the tupleType

//return LogicalDocumentProject
throw new NotImplementedException();
return inputs.get( 0 ); // this does not really test fusion
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2019-2024 The Polypheny Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.polypheny.db.workflow.dag.activities.impl;

import static org.polypheny.db.workflow.dag.activities.impl.DocLoadActivity.COLL_KEY;

import java.util.List;
import java.util.Optional;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.catalog.entity.logical.LogicalCollection;
import org.polypheny.db.catalog.logistic.DataModel;
import org.polypheny.db.transaction.Transaction;
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.workflow.dag.activities.Activity;
import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory;
import org.polypheny.db.workflow.dag.activities.Activity.PortType;
import org.polypheny.db.workflow.dag.activities.ActivityException;
import org.polypheny.db.workflow.dag.activities.ActivityException.InvalidSettingException;
import org.polypheny.db.workflow.dag.activities.Pipeable;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.InPort;
import org.polypheny.db.workflow.dag.annotations.EntitySetting;
import org.polypheny.db.workflow.dag.settings.EntityValue;
import org.polypheny.db.workflow.dag.settings.SettingDef.Settings;
import org.polypheny.db.workflow.dag.settings.SettingDef.SettingsPreview;
import org.polypheny.db.workflow.engine.execution.context.ExecutionContext;
import org.polypheny.db.workflow.engine.execution.context.PipeExecutionContext;
import org.polypheny.db.workflow.engine.execution.pipe.InputPipe;
import org.polypheny.db.workflow.engine.execution.pipe.OutputPipe;
import org.polypheny.db.workflow.engine.storage.DocBatchWriter;
import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader;
import org.polypheny.db.workflow.engine.storage.reader.DocReader;

@ActivityDefinition(type = "docLoad", displayName = "Load Collection", categories = { ActivityCategory.LOAD, ActivityCategory.DOCUMENT },
inPorts = { @InPort(type = PortType.DOC) },
outPorts = {})

@EntitySetting(key = COLL_KEY, displayName = "Collection", dataModel = DataModel.DOCUMENT)
@SuppressWarnings("unused")
public class DocLoadActivity implements Activity, Pipeable {

static final String COLL_KEY = "collection";


@Override
public List<Optional<AlgDataType>> previewOutTypes( List<Optional<AlgDataType>> inTypes, SettingsPreview settings ) throws ActivityException {
return List.of();
}


@Override
public void reset() {

}


@Override
public void execute( List<CheckpointReader> inputs, Settings settings, ExecutionContext ctx ) throws Exception {
LogicalCollection collection = getEntity( settings.get( COLL_KEY, EntityValue.class ) );
DocReader reader = (DocReader) inputs.get( 0 );
write( collection, ctx.getTransaction(), reader.getIterable(), ctx, reader.getDocCount() );
}


@Override
public AlgDataType lockOutputType( List<AlgDataType> inTypes, Settings settings ) throws Exception {
return null;
}


@Override
public void pipe( List<InputPipe> inputs, OutputPipe output, Settings settings, PipeExecutionContext ctx ) throws Exception {
LogicalCollection collection = getEntity( settings.get( COLL_KEY, EntityValue.class ) );
write( collection, ctx.getTransaction(), inputs.get( 0 ), null, 1 );
}


private LogicalCollection getEntity( EntityValue setting ) throws ActivityException {
// TODO: check if the adapter is a data store (and thus writable)
LogicalCollection collection = setting.getCollection();
if ( collection == null ) {
throw new InvalidSettingException( "Specified collection does not exist", COLL_KEY );
}
return collection;
}


private void write( LogicalCollection collection, Transaction transaction, Iterable<List<PolyValue>> tuples, ExecutionContext ctx, long totalTuples ) throws Exception {
long count = 0;
try ( DocBatchWriter writer = new DocBatchWriter( collection, transaction ) ) {
for ( List<PolyValue> tuple : tuples ) {
System.out.println( "Loading value " + tuple.get( 0 ).asDocument() );
writer.write( tuple.get( 0 ).asDocument() );

count++;
if ( ctx != null && count % 1024 == 0 ) {
ctx.updateProgress( (double) count / totalTuples );
ctx.checkInterrupted();
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package org.polypheny.db.workflow.dag.activities.impl;

import static org.polypheny.db.workflow.dag.activities.impl.RelValuesActivity.LAST_NAMES;
import static org.polypheny.db.workflow.dag.activities.impl.RelValuesActivity.NAMES;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -61,10 +64,9 @@
)
@IntSetting(key = "count", displayName = "Document Count", defaultValue = 3, min = 1, max = 1_000_000)
@BoolSetting(key = "fixSeed", displayName = "Fix Random Seed", defaultValue = false)
public class DocValuesActivity implements Activity, Fusable, Pipeable {

private static final List<String> NAMES = List.of( "Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Hank" );
private static final List<String> LAST_NAMES = List.of( "Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis" );
@SuppressWarnings("unused")
public class DocValuesActivity implements Activity, Fusable, Pipeable {


@Override
Expand All @@ -75,12 +77,11 @@ public List<Optional<AlgDataType>> previewOutTypes( List<Optional<AlgDataType>>

@Override
public void execute( List<CheckpointReader> inputs, Settings settings, ExecutionContext ctx ) throws Exception {
try ( DocWriter writer = ctx.createDocWriter( 0 ) ) {
writer.writeFromIterator( getValues(
settings.get( "count", IntValue.class ).getValue(),
settings.get( "fixSeed", BoolValue.class ).getValue()
).iterator() );
}
DocWriter writer = ctx.createDocWriter( 0 );
writer.writeFromIterator( getValues(
settings.get( "count", IntValue.class ).getValue(),
settings.get( "fixSeed", BoolValue.class ).getValue()
).iterator() );
}


Expand All @@ -98,7 +99,7 @@ public void pipe( List<InputPipe> inputs, OutputPipe output, Settings settings,
Random random = fixSeed ? new Random( 42 ) : new Random();
for ( int i = 0; i < n; i++ ) {
PolyDocument doc = getValue( random );
output.put( List.of( doc ) );
output.put( doc );
log.info( "Value pipe inserted " + doc );
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
)
@StringSetting(key = "variableName", displayName = "Variable Name", defaultValue = "field_names", minLength = 1, maxLength = 128)
@IntSetting(key = "fields", displayName = "Target Fields", isList = true, defaultValue = 0, min = 0)

@SuppressWarnings("unused")
public class FieldNameToVariableActivity implements VariableWriter {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.polypheny.db.workflow.engine.execution.pipe.OutputPipe;
import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader;
import org.polypheny.db.workflow.engine.storage.reader.RelReader;
import org.polypheny.db.workflow.engine.storage.writer.RelWriter;

@ActivityDefinition(type = "identity", displayName = "Identity", categories = { ActivityCategory.TRANSFORM, ActivityCategory.RELATIONAL },
inPorts = { @InPort(type = PortType.REL) },
Expand All @@ -59,6 +58,8 @@
)
@IntSetting(key = "I2", displayName = "THIRD", defaultValue = 0, isList = true, group = "groupA")
@StringSetting(key = "S2", displayName = "FOURTH", defaultValue = "test", isList = true, group = "groupA", subGroup = "a")

@SuppressWarnings("unused")
public class IdentityActivity implements Activity, Fusable, Pipeable {


Expand All @@ -75,9 +76,8 @@ public List<Optional<AlgDataType>> previewOutTypes( List<Optional<AlgDataType>>
@Override
public void execute( List<CheckpointReader> inputs, Settings settings, ExecutionContext ctx ) throws Exception {
RelReader input = (RelReader) inputs.get( 0 );
try ( RelWriter output = ctx.createRelWriter( 0, input.getTupleType(), false ) ) {
output.write( input.getIterator() );
}
ctx.createRelWriter( 0, input.getTupleType(), false )
.write( input.getIterator() );
}


Expand Down
Loading

0 comments on commit 1b89e7e

Please sign in to comment.