Skip to content

Commit

Permalink
Add endpoint for opening a stored workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Dec 20, 2024
1 parent d164f75 commit 7e5c317
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
package org.polypheny.db.workflow;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import io.javalin.http.ContentType;
import io.javalin.http.Context;
import io.javalin.http.HttpCode;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -43,7 +47,10 @@
import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader;
import org.polypheny.db.workflow.models.ActivityConfigModel;
import org.polypheny.db.workflow.models.ActivityModel;
import org.polypheny.db.workflow.models.EdgeModel;
import org.polypheny.db.workflow.models.RenderModel;
import org.polypheny.db.workflow.models.WorkflowConfigModel;
import org.polypheny.db.workflow.models.WorkflowModel;
import org.polypheny.db.workflow.models.requests.CreateSessionRequest;
import org.polypheny.db.workflow.models.requests.SaveSessionRequest;
import org.polypheny.db.workflow.repo.WorkflowRepo;
Expand Down Expand Up @@ -73,6 +80,7 @@ public WorkflowManager() {
@Override
public void run() {
registerAdapter(); // TODO: only register adapter when the first workflow is opened
addSampleWorkflows();
}
},
1000
Expand Down Expand Up @@ -146,6 +154,44 @@ private void createExecuteDummyWorkflowTest() {
}


private void addSampleWorkflows() {
try {
UUID id = repo.createWorkflow( "Test Workflow" );
repo.writeVersion( id, "Initial Version", getWorkflow1() );

} catch ( WorkflowRepoException e ) {
throw new RuntimeException( e );
}
}


// TODO: replace with workflows in resource directory
private WorkflowModel getWorkflow( List<ActivityModel> activities, List<EdgeModel> edges, boolean fusionEnabled, boolean pipelineEnabled, int maxWorkers ) {
WorkflowConfigModel config = new WorkflowConfigModel(
Map.of( DataModel.RELATIONAL, "hsqldb", DataModel.DOCUMENT, "hsqldb", DataModel.GRAPH, "hsqldb" ),
fusionEnabled,
pipelineEnabled,
maxWorkers,
10 // low on purpose to observe blocking
);
Map<String, JsonNode> variables = Map.of( "creationTime", TextNode.valueOf( LocalDateTime.now().format( DateTimeFormatter.ISO_DATE_TIME ) ) );
return new WorkflowModel( activities, edges, config, variables );
}


// TODO: replace with workflows in resource directory
private WorkflowModel getWorkflow1() {
List<ActivityModel> activities = List.of(
new ActivityModel( "relValues" ),
new ActivityModel( "debug" )
);
List<EdgeModel> edges = List.of(
EdgeModel.of( activities.get( 0 ), activities.get( 1 ) )
);
return getWorkflow( activities, edges, false, false, 1 );
}


private void registerAdapter() {
if ( PolyphenyDb.mode == RunMode.TEST || Catalog.getInstance().getAdapters().values().stream().anyMatch( a -> a.adapterName.equals( "hsqldb_disk" ) ) ) {
return;
Expand All @@ -164,7 +210,7 @@ private void registerAdapter() {
private void registerEndpoints() {
HttpServer server = HttpServer.getInstance();

server.addWebsocketRoute( PATH + "/websocket/{sessionId}", new WorkflowWebSocket() );
server.addWebsocketRoute( PATH + "/webSocket/{sessionId}", new WorkflowWebSocket() );

server.addSerializedRoute( PATH + "/sessions", this::getSessions, HandlerType.GET );
server.addSerializedRoute( PATH + "/sessions/{sessionId}", this::getSession, HandlerType.GET );
Expand All @@ -176,6 +222,7 @@ private void registerEndpoints() {

server.addSerializedRoute( PATH + "/sessions", this::createSession, HandlerType.POST );
server.addSerializedRoute( PATH + "/sessions/{sessionId}/save", this::saveSession, HandlerType.POST );
server.addSerializedRoute( PATH + "/workflows/{workflowId}/{version}", this::openWorkflow, HandlerType.POST );

server.addSerializedRoute( PATH + "/sessions/{sessionId}", this::terminateSession, HandlerType.DELETE );
}
Expand Down Expand Up @@ -231,6 +278,14 @@ private void createSession( final Context ctx ) {
}


private void openWorkflow( final Context ctx ) {
UUID workflowId = UUID.fromString( ctx.pathParam( "workflowId" ) );
int version = Integer.parseInt( ctx.pathParam( "version" ) );
// TODO: combine with CreateSessionRequest into createSession endpoint?
process( ctx, () -> sessionManager.createUserSession( workflowId, version ) );
}


private void saveSession( final Context ctx ) {
UUID sessionId = UUID.fromString( ctx.pathParam( "sessionId" ) );
SaveSessionRequest request = ctx.bodyAsClass( SaveSessionRequest.class );
Expand All @@ -253,10 +308,11 @@ private void terminateSession( final Context ctx ) {
private void process( Context ctx, ResultSupplier s ) {
try {
sendResult( ctx, s.get() );
} catch ( WorkflowRepoException e ) {
} catch ( Exception e ) {
// TODO: better error handling
ctx.status( HttpCode.INTERNAL_SERVER_ERROR );
ctx.json( e );
e.printStackTrace();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@ public class SessionModel {

SessionModelType type;
UUID sessionId; // TODO: remove redundant session id, since it's already the map key? Or send list
int connectionCount;

// USER_SESSION fields:
UUID workflowId;
Integer version;


public SessionModel( SessionModelType type, UUID sId ) {
public SessionModel( SessionModelType type, UUID sId, int connectionCount ) {
this.type = type;
this.sessionId = sId;
this.connectionCount = connectionCount;
this.workflowId = null;
this.version = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,13 @@ public WorkflowModel() {
variables = Map.of();
}


public WorkflowModel( List<ActivityModel> activities, List<EdgeModel> edges, WorkflowConfigModel config, Map<String, JsonNode> variables ) {
this.activities = activities;
this.edges = edges;
this.config = config;
this.variables = variables;
this.state = null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ public void unsubscribe( Session session ) {
subscribers.remove( session );
}

public int getSubscriberCount() {
return subscribers.size();
}


public abstract SessionModel toModel();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void terminate() {

@Override
public SessionModel toModel() {
return new SessionModel( SessionModelType.API_SESSION, sessionId );
return new SessionModel( SessionModelType.API_SESSION, sessionId, getSubscriberCount() );
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void handleRequest( InterruptRequest request ) {

@Override
public SessionModel toModel() {
return new SessionModel( SessionModelType.USER_SESSION, sessionId, wId, openedVersion );
return new SessionModel( SessionModelType.USER_SESSION, sessionId, getSubscriberCount(), wId, openedVersion );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static Workflow getWorkflow( List<ActivityModel> activities, List<EdgeMod
10 // low on purpose to observe blocking
);
Map<String, JsonNode> variables = Map.of( "creationTime", TextNode.valueOf( LocalDateTime.now().format( DateTimeFormatter.ISO_DATE_TIME ) ) );
return WorkflowImpl.fromModel( new WorkflowModel( activities, edges, config, variables, null ) );
return WorkflowImpl.fromModel( new WorkflowModel( activities, edges, config, variables ) );
}


Expand Down

0 comments on commit 7e5c317

Please sign in to comment.