Skip to content

Commit

Permalink
Fix locking issue when concurrently creating checkpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Dec 12, 2024
1 parent 86b9743 commit 3c92138
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public class GlobalScheduler {

private static GlobalScheduler INSTANCE;
private static final int globalWorkers = 1; // TODO: use config value, allow to change it when no scheduler is running
public static final int GLOBAL_WORKERS = 20; // TODO: use config value, allow to change it when no scheduler is running

private final Map<UUID, WorkflowScheduler> schedulers = new HashMap<>();
private final Map<UUID, Set<ExecutionSubmission>> activeSubmissions = new ConcurrentHashMap<>(); // used for interrupting the execution
Expand All @@ -52,7 +52,7 @@ public class GlobalScheduler {


private GlobalScheduler() {
executor = new ThreadPoolExecutor( 0, globalWorkers,
executor = new ThreadPoolExecutor( 0, GLOBAL_WORKERS,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>() );
completionService = new ExecutorCompletionService<>( executor );
Expand All @@ -73,7 +73,7 @@ public synchronized void startExecution( Workflow workflow, StorageManager sm, @
throw new GenericRuntimeException( "Cannot execute a workflow that is already being executed." );
}
interruptedSessions.remove( sessionId );
WorkflowScheduler scheduler = new WorkflowScheduler( workflow, sm, globalWorkers, targetActivity );
WorkflowScheduler scheduler = new WorkflowScheduler( workflow, sm, GLOBAL_WORKERS, targetActivity );
List<ExecutionSubmission> submissions = scheduler.startExecution();
if ( submissions.isEmpty() ) {
throw new GenericRuntimeException( "At least one activity needs to be executable when submitting a workflow for execution" );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.NotImplementedException;
import org.polypheny.db.adapter.AdapterManager;
import org.polypheny.db.adapter.DataStore;
Expand All @@ -34,6 +36,7 @@
import org.polypheny.db.catalog.entity.logical.LogicalCollection;
import org.polypheny.db.catalog.entity.logical.LogicalEntity;
import org.polypheny.db.catalog.entity.logical.LogicalGraph;
import org.polypheny.db.catalog.entity.logical.LogicalNamespace;
import org.polypheny.db.catalog.entity.logical.LogicalTable;
import org.polypheny.db.catalog.logistic.Collation;
import org.polypheny.db.catalog.logistic.ConstraintType;
Expand All @@ -47,8 +50,11 @@
import org.polypheny.db.transaction.Transaction;
import org.polypheny.db.transaction.TransactionManager;
import org.polypheny.db.transaction.TransactionManagerImpl;
import org.polypheny.db.transaction.locking.Lockable.LockType;
import org.polypheny.db.transaction.locking.LockablesRegistry;
import org.polypheny.db.type.ArrayType;
import org.polypheny.db.type.PolyType;
import org.polypheny.db.util.DeadlockException;
import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader;
import org.polypheny.db.workflow.engine.storage.reader.DocReader;
import org.polypheny.db.workflow.engine.storage.reader.LpgReader;
Expand All @@ -60,6 +66,7 @@
import org.polypheny.db.workflow.models.ActivityConfigModel;
import org.polypheny.db.workflow.models.ActivityConfigModel.CommonType;

@Slf4j
public class StorageManagerImpl implements StorageManager {

public static final String REL_PREFIX = "rel_";
Expand All @@ -71,7 +78,7 @@ public class StorageManagerImpl implements StorageManager {
private final UUID sessionId;
private final Map<DataModel, String> defaultStores;
private final Map<UUID, Map<Integer, LogicalEntity>> checkpoints = new ConcurrentHashMap<>();
private final List<String> registeredNamespaces = new ArrayList<>();
private final Map<Long, String> registeredNamespaces = new ConcurrentHashMap<>();
private final AdapterManager adapterManager;
private final TransactionManager transactionManager;
private final DdlManager ddlManager;
Expand All @@ -98,10 +105,10 @@ public StorageManagerImpl( UUID sessionId, Map<DataModel, String> defaultStores

relNamespace = ddlManager.createNamespace(
REL_PREFIX + sessionId, DataModel.RELATIONAL, true, false, null );
registeredNamespaces.add( REL_PREFIX + sessionId );
registeredNamespaces.put( relNamespace, REL_PREFIX + sessionId );

docNamespace = ddlManager.createNamespace( DOC_PREFIX + sessionId, DataModel.DOCUMENT, true, false, null );
registeredNamespaces.add( DOC_PREFIX + sessionId );
registeredNamespaces.put( docNamespace, DOC_PREFIX + sessionId );
}


Expand Down Expand Up @@ -168,7 +175,8 @@ public List<Optional<AlgDataType>> getOptionalCheckpointTypes( UUID activityId )


@Override
public RelWriter createRelCheckpoint( UUID activityId, int outputIdx, AlgDataType type, boolean resetPk, @Nullable String storeName ) {
// TODO: it should be possible to remove synchronized when schema locking works, but it doesn't hurt to keep it
public synchronized RelWriter createRelCheckpoint( UUID activityId, int outputIdx, AlgDataType type, boolean resetPk, @Nullable String storeName ) {
if ( storeName == null || storeName.isEmpty() ) {
storeName = getDefaultStore( DataModel.RELATIONAL );
}
Expand All @@ -184,6 +192,8 @@ public RelWriter createRelCheckpoint( UUID activityId, int outputIdx, AlgDataTyp

Transaction createTransaction = QueryUtils.startTransaction( relNamespace, "RelCreate" );
String tableName = TABLE_PREFIX + activityId + "_" + outputIdx;

acquireSchemaLock( createTransaction, relNamespace );
ddlManager.createTable(
relNamespace,
tableName,
Expand All @@ -204,13 +214,13 @@ public RelWriter createRelCheckpoint( UUID activityId, int outputIdx, AlgDataTyp


@Override
public DocWriter createDocCheckpoint( UUID activityId, int outputIdx, @Nullable String storeName ) {
public synchronized DocWriter createDocCheckpoint( UUID activityId, int outputIdx, @Nullable String storeName ) {
throw new NotImplementedException();
}


@Override
public LpgWriter createLpgCheckpoint( UUID activityId, int outputIdx, @Nullable String storeName ) {
public synchronized LpgWriter createLpgCheckpoint( UUID activityId, int outputIdx, @Nullable String storeName ) {
throw new NotImplementedException();
}

Expand Down Expand Up @@ -335,6 +345,7 @@ private DataStore<?> getStore( String storeName ) {
private void dropEntity( LogicalEntity entity ) {
Transaction transaction = startTransaction( entity );
Statement statement = transaction.createStatement();
acquireSchemaLock( transaction, entity.getNamespaceId() );
switch ( entity.dataModel ) {
case RELATIONAL -> ddlManager.dropTable( (LogicalTable) entity, statement );
case DOCUMENT -> ddlManager.dropCollection( (LogicalCollection) entity, statement );
Expand All @@ -346,10 +357,12 @@ private void dropEntity( LogicalEntity entity ) {

private void dropNamespaces() {
Transaction transaction = QueryUtils.startTransaction( relNamespace, "DropNamespaces" );
for ( String ns : registeredNamespaces ) {
ddlManager.dropNamespace( ns, true, transaction.createStatement() );
for ( Entry<Long, String> entry : registeredNamespaces.entrySet() ) {
acquireSchemaLock( transaction, entry.getKey() );
ddlManager.dropNamespace( entry.getValue(), true, transaction.createStatement() );
}
transaction.commit();
registeredNamespaces.clear();
}

// Utils:
Expand Down Expand Up @@ -408,6 +421,12 @@ private ColumnTypeInformation getColTypeInfo( AlgDataTypeField field ) {
}


private void acquireSchemaLock( Transaction transaction, long namespaceId ) throws DeadlockException {
LogicalNamespace namespace = Catalog.getInstance().getSnapshot().getNamespace( namespaceId ).orElseThrow();
transaction.acquireLockable( LockablesRegistry.convertToLockable( namespace ), LockType.EXCLUSIVE );
}


/**
* In practice, calling close should not be required, since the transactions should be closed manually
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,20 @@ public static Workflow getVariableWritingWorkflow() {
return getWorkflow( activities, edges, false, false, 1 );
}

public static Workflow getParallelBranchesWorkflow(int nBranches, int millisPerBranch, int maxWorkers) {
assert nBranches > 0 && millisPerBranch > 0;
ActivityModel root = new ActivityModel( "relValues" );
List<ActivityModel> activities = new ArrayList<>(List.of(root));
List<EdgeModel> edges = new ArrayList<>();

for (int i = 0; i<nBranches; i++) {
ActivityModel branch = new ActivityModel( "debug", Map.of( "delay", IntNode.valueOf( millisPerBranch ) ) );
activities.add( branch );
edges.add( EdgeModel.of( root, branch ) );
}
return getWorkflow( activities, edges, false, false, maxWorkers );
}


public static List<UUID> getTopologicalActivityIds( Workflow workflow ) {
List<UUID> list = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.polypheny.db.TestHelper;
import org.polypheny.db.type.entity.PolyValue;
Expand Down Expand Up @@ -236,6 +237,47 @@ void variableWritingTest() throws Exception {
}


@Test
void concurrentActivityExecutionTest() throws Exception {
int nBranches = 10;
int delay = 1000;
assert nBranches <= GlobalScheduler.GLOBAL_WORKERS;

Workflow workflow = WorkflowUtils.getParallelBranchesWorkflow( nBranches, delay, nBranches );
List<UUID> ids = WorkflowUtils.getTopologicalActivityIds( workflow );
scheduler.startExecution( workflow, sm, null );
scheduler.awaitResultProcessor( delay + 2000 ); // not enough time if not executed concurrently

for ( int i = 0; i <= nBranches; i++ ) { // also checks initial activity
assertTrue( sm.hasCheckpoint( ids.get( i ), 0 ) );
}
}


@Test
void concurrentWorkflowExecutionTest() throws Exception {
int nWorkflows = 10;
int delay = 1000;
assert nWorkflows <= GlobalScheduler.GLOBAL_WORKERS;

List<Workflow> workflows = new ArrayList<>();
List<StorageManager> storageManagers = new ArrayList<>();
for ( int i = 0; i < nWorkflows; i++ ) {
StorageManager storageManager = new StorageManagerImpl( UUID.randomUUID(), StorageUtils.getDefaultStoreMap( "locks" ) );
Workflow workflow = WorkflowUtils.getLongRunningPipe( delay );
workflows.add( workflow );
storageManagers.add( storageManager );
scheduler.startExecution( workflow, storageManager, null );
}
scheduler.awaitResultProcessor( delay + 2000 ); // not enough time if not executed concurrently

for ( Pair<Workflow, StorageManager> entry : Pair.zip( workflows, storageManagers ) ) { // also checks initial activity
List<UUID> ids = WorkflowUtils.getTopologicalActivityIds( entry.left );
assertTrue( entry.right.hasCheckpoint( ids.get( ids.size() - 1 ), 0 ) );
}
}


private void checkFailed( Workflow workflow, List<UUID> failedActivityIds ) {
for ( UUID id : failedActivityIds ) {
assertFalse( sm.hasCheckpoint( id, 0 ) );
Expand Down

0 comments on commit 3c92138

Please sign in to comment.