Skip to content

Commit

Permalink
Add sample workflows to resources directory
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Dec 23, 2024
1 parent 7e5c317 commit 6470060
Show file tree
Hide file tree
Showing 26 changed files with 2,098 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
package org.polypheny.db.workflow;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import io.javalin.http.ContentType;
import io.javalin.http.Context;
import io.javalin.http.HttpCode;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -38,18 +37,18 @@
import org.polypheny.db.catalog.logistic.DataModel;
import org.polypheny.db.ddl.DdlManager;
import org.polypheny.db.util.RunMode;
import org.polypheny.db.util.Sources;
import org.polypheny.db.webui.ConfigService.HandlerType;
import org.polypheny.db.webui.HttpServer;
import org.polypheny.db.workflow.dag.activities.ActivityRegistry;
import org.polypheny.db.workflow.dag.activities.ActivityWrapper;
import org.polypheny.db.workflow.engine.execution.context.ExecutionContextImpl;
import org.polypheny.db.workflow.engine.storage.StorageManager;
import org.polypheny.db.workflow.engine.storage.StorageManagerImpl;
import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader;
import org.polypheny.db.workflow.models.ActivityConfigModel;
import org.polypheny.db.workflow.models.ActivityModel;
import org.polypheny.db.workflow.models.EdgeModel;
import org.polypheny.db.workflow.models.RenderModel;
import org.polypheny.db.workflow.models.WorkflowConfigModel;
import org.polypheny.db.workflow.models.WorkflowModel;
import org.polypheny.db.workflow.models.requests.CreateSessionRequest;
import org.polypheny.db.workflow.models.requests.SaveSessionRequest;
Expand Down Expand Up @@ -155,40 +154,27 @@ private void createExecuteDummyWorkflowTest() {


private void addSampleWorkflows() {
try {
UUID id = repo.createWorkflow( "Test Workflow" );
repo.writeVersion( id, "Initial Version", getWorkflow1() );

} catch ( WorkflowRepoException e ) {
throw new RuntimeException( e );
if ( PolyphenyDb.mode == RunMode.TEST) {
return;
}
URL workflowDir = this.getClass().getClassLoader().getResource( "workflows/" );
File[] files = Sources.of( workflowDir )
.file()
.listFiles( ( d, name ) -> name.endsWith( ".json" ) );
if (files == null) {
return;
}
}


// TODO: replace with workflows in resource directory
private WorkflowModel getWorkflow( List<ActivityModel> activities, List<EdgeModel> edges, boolean fusionEnabled, boolean pipelineEnabled, int maxWorkers ) {
WorkflowConfigModel config = new WorkflowConfigModel(
Map.of( DataModel.RELATIONAL, "hsqldb", DataModel.DOCUMENT, "hsqldb", DataModel.GRAPH, "hsqldb" ),
fusionEnabled,
pipelineEnabled,
maxWorkers,
10 // low on purpose to observe blocking
);
Map<String, JsonNode> variables = Map.of( "creationTime", TextNode.valueOf( LocalDateTime.now().format( DateTimeFormatter.ISO_DATE_TIME ) ) );
return new WorkflowModel( activities, edges, config, variables );
}


// TODO: replace with workflows in resource directory
private WorkflowModel getWorkflow1() {
List<ActivityModel> activities = List.of(
new ActivityModel( "relValues" ),
new ActivityModel( "debug" )
);
List<EdgeModel> edges = List.of(
EdgeModel.of( activities.get( 0 ), activities.get( 1 ) )
);
return getWorkflow( activities, edges, false, false, 1 );
for (File file : files) {
String fileName = file.getName();
try {
WorkflowModel workflow = mapper.readValue( file, WorkflowModel.class );
UUID wId = repo.createWorkflow( fileName );
repo.writeVersion( wId, "Created Sample Workflow", workflow );
} catch ( IOException e ) {
throw new RuntimeException( e );
}
}
}


Expand Down Expand Up @@ -219,6 +205,7 @@ private void registerEndpoints() {
server.addSerializedRoute( PATH + "/sessions/{sessionId}/workflow/{activityId}", this::getActivity, HandlerType.GET );
server.addSerializedRoute( PATH + "/sessions/{sessionId}/workflow/{activityId}/{outIndex}", this::getIntermediaryResult, HandlerType.GET );
server.addSerializedRoute( PATH + "/workflows", this::getWorkflowDefs, HandlerType.GET );
server.addSerializedRoute( PATH + "/registry", this::getActivityRegistry, HandlerType.GET );

server.addSerializedRoute( PATH + "/sessions", this::createSession, HandlerType.POST );
server.addSerializedRoute( PATH + "/sessions/{sessionId}/save", this::saveSession, HandlerType.POST );
Expand Down Expand Up @@ -272,6 +259,11 @@ private void getWorkflowDefs( final Context ctx ) {
}


private void getActivityRegistry( final Context ctx ) {
process( ctx, ActivityRegistry::getRegistry );
}


private void createSession( final Context ctx ) {
CreateSessionRequest request = ctx.bodyAsClass( CreateSessionRequest.class );
process( ctx, () -> sessionManager.createUserSession( request.getName() ) );
Expand Down Expand Up @@ -308,10 +300,13 @@ private void terminateSession( final Context ctx ) {
private void process( Context ctx, ResultSupplier s ) {
try {
sendResult( ctx, s.get() );
} catch ( WorkflowRepoException e ) {
ctx.status( e.getErrorCode() );
ctx.json( e.getMessage() );
} catch ( Exception e ) {
// TODO: better error handling
ctx.status( HttpCode.INTERNAL_SERVER_ERROR );
ctx.json( e );
ctx.json( e.getMessage() );
e.printStackTrace();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ public ActivityModel toModel( boolean includeState ) {
if ( includeState ) {
List<TypePreviewModel> inTypeModels = inTypePreview.stream().map(
inType -> inType.map( TypePreviewModel::of ).orElse( null ) ).toList();
return new ActivityModel( type, id, serializableSettings, config, rendering, this.state, inTypeModels, invalidStateReason.toString() );
String invalidReason = invalidStateReason == null ? null : invalidStateReason.toString();
return new ActivityModel( type, id, serializableSettings, config, rendering, this.state, inTypeModels, invalidReason );

} else {
return new ActivityModel( type, id, serializableSettings, config, rendering );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.algebra.type.AlgDataTypeFactory;
import org.polypheny.db.algebra.type.GraphType;
import org.polypheny.db.transaction.Transaction;
import org.polypheny.db.type.PolyType;
import org.polypheny.db.type.entity.PolyString;
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.type.entity.graph.PolyDictionary;
Expand Down Expand Up @@ -143,7 +142,7 @@ public void reset() {


private static AlgDataType getType() {
return AlgDataTypeFactory.DEFAULT.createPolyType( PolyType.NODE );
return GraphType.of();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public ControlEdge( ActivityWrapper from, ActivityWrapper to, int controlPort )

public EdgeModel toModel( boolean includeState ) {
EdgeState state = includeState ? getState() : null;
return new EdgeModel( from.getId(), to.getId(), getControlPort(), 0, false, state );
return new EdgeModel( from.getId(), to.getId(), getControlPort(), 0, true, state );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void validateValue( SettingValue value ) throws InvalidSettingException {
case GRAPH -> entityValue.getGraph();
};
if ( entity == null ) {
throw new InvalidSettingException( "Entity does not exist", getKey() );
throw new InvalidSettingException( "Entity " + entityValue.getNamespace() + "." + entityValue.getName() + " does not exist", getKey() );
}
}
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class WorkflowConfigModel {
boolean pipelineEnabled;
int maxWorkers;
int pipelineQueueCapacity;
// TODO: config value for changing behavior of deleting created checkpoints


public static WorkflowConfigModel of() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.polypheny.db.workflow.repo;

import io.javalin.http.HttpCode;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -207,13 +208,27 @@ static UUID getIdFromString( String str ) {
@Getter
class WorkflowRepoException extends IOException {

private final HttpCode errorCode;


WorkflowRepoException( String message, Throwable cause, HttpCode errorCode ) {
super( message, cause );
this.errorCode = errorCode;
}


WorkflowRepoException( String message ) {
super( message );
this( message, null, HttpCode.INTERNAL_SERVER_ERROR );
}


WorkflowRepoException( String message, HttpCode errorCode ) {
this( message, null, errorCode );
}


WorkflowRepoException( String message, Throwable cause ) {
super( message, cause );
this( message, cause, HttpCode.INTERNAL_SERVER_ERROR );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.polypheny.db.workflow.repo;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.javalin.http.HttpCode;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
Expand Down Expand Up @@ -86,7 +87,7 @@ public WorkflowDefModel getWorkflowDef( UUID id ) throws WorkflowRepoException {
File dir = getWorkflowDir( id );
File defFile = new File( dir, DEF_FILE );
if ( !defFile.exists() ) {
throw new WorkflowRepoException( DEF_FILE + " not found for workflow ID: " + id );
throw new WorkflowRepoException( DEF_FILE + " not found for workflow ID: " + id, HttpCode.NOT_FOUND );
}
try {
return mapper.readValue( defFile, WorkflowDefModel.class );
Expand All @@ -99,7 +100,7 @@ public WorkflowDefModel getWorkflowDef( UUID id ) throws WorkflowRepoException {
@Override
public UUID createWorkflow( String name ) throws WorkflowRepoException {
if ( doesNameExist( name ) ) {
throw new WorkflowRepoException( "Name already exists: " + name );
throw new WorkflowRepoException( "Name already exists: " + name, HttpCode.CONFLICT );
}

UUID id = UUID.randomUUID();
Expand All @@ -124,7 +125,7 @@ public WorkflowModel readVersion( UUID id, int version ) throws WorkflowRepoExce
File file = new File( dir, version + ".json" );

if ( !doesExist( id, version ) || !file.exists() ) {
throw new WorkflowRepoException( "Workflow " + id + ", v" + version + " does not exist." );
throw new WorkflowRepoException( "Workflow " + id + ", v" + version + " does not exist.", HttpCode.NOT_FOUND );
}

try {
Expand All @@ -151,7 +152,7 @@ public int writeVersion( UUID id, String description, WorkflowModel wf ) throws
@Override
public void deleteWorkflow( UUID id ) throws WorkflowRepoException {
if ( !doesExist( id ) ) {
throw new WorkflowRepoException( "Unable to delete non-existent workflow " + id );
throw new WorkflowRepoException( "Unable to delete non-existent workflow " + id, HttpCode.NOT_FOUND );
}
if ( !phm.recursiveDeleteFolder( WORKFLOWS_PATH + "/" + id.toString() ) ) {
throw new WorkflowRepoException( "Failed to delete workflow " + id );
Expand All @@ -162,13 +163,13 @@ public void deleteWorkflow( UUID id ) throws WorkflowRepoException {
@Override
public void deleteVersion( UUID id, int version ) throws WorkflowRepoException {
if ( !doesExist( id, version ) ) {
throw new WorkflowRepoException( "Unable to delete non-existent workflow version " + id + " v" + version );
throw new WorkflowRepoException( "Unable to delete non-existent workflow version " + id + " v" + version, HttpCode.NOT_FOUND );
}

File dir = getWorkflowDir( id );
File versionFile = new File( dir, version + ".json" );
if ( !versionFile.exists() ) {
throw new WorkflowRepoException( "Version file " + versionFile.getName() + " not found for workflow " + id );
throw new WorkflowRepoException( "Version file " + versionFile.getName() + " not found for workflow " + id, HttpCode.NOT_FOUND );
}
if ( !versionFile.delete() ) {
throw new WorkflowRepoException( "Failed to delete version file: " + versionFile.getAbsolutePath() );
Expand All @@ -187,7 +188,7 @@ public void renameWorkflow( UUID id, String name ) throws WorkflowRepoException
return; // same name as before
}
if ( doesNameExist( name ) ) {
throw new WorkflowRepoException( "A workflow with name " + name + " already exists" );
throw new WorkflowRepoException( "A workflow with name " + name + " already exists", HttpCode.CONFLICT );
}
def.setName( name );
serializeToFile( new File( getWorkflowDir( id ), DEF_FILE ), def ); // updated definition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public static SessionManager getInstance() {

public UUID createUserSession( String newWorkflowName ) throws WorkflowRepoException {
UUID wId = repo.createWorkflow( newWorkflowName );
UUID sessionId = registerUserSession( new WorkflowImpl(), wId, 0 );
saveUserSession( sessionId, "Initial Save" ); // TODO: remove initial save, delete workflow if its session is stopped without a saved version
return registerUserSession( new WorkflowImpl(), wId, 0 );
}

Expand Down
Loading

0 comments on commit 6470060

Please sign in to comment.