Skip to content

Commit

Permalink
Closes Taskana#2599 - add locking to Query to enable adapter clustering
Browse files Browse the repository at this point in the history
  • Loading branch information
ryzheboka committed Aug 7, 2024
1 parent bbad969 commit ef05fc1
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 8 deletions.
6 changes: 6 additions & 0 deletions lib/taskana-core-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@
<version>${version.equalsverifier}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>pro.taskana</groupId>
<artifactId>taskana-common-test</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,23 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.catchThrowableOfType;
import static pro.taskana.task.api.CallbackState.CALLBACK_PROCESSING_REQUIRED;
import static pro.taskana.testapi.DefaultTestEntities.defaultTestClassification;
import static pro.taskana.testapi.DefaultTestEntities.defaultTestObjectReference;
import static pro.taskana.testapi.DefaultTestEntities.defaultTestWorkbasket;

import java.security.PrivilegedAction;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.security.auth.Subject;
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DynamicTest;
Expand All @@ -26,8 +33,13 @@
import pro.taskana.common.api.IntInterval;
import pro.taskana.common.api.KeyDomain;
import pro.taskana.common.api.TimeInterval;
import pro.taskana.common.api.exceptions.SystemException;
import pro.taskana.common.api.security.CurrentUserContext;
import pro.taskana.common.api.security.UserPrincipal;
import pro.taskana.common.internal.InternalTaskanaEngine;
import pro.taskana.common.internal.util.CheckedConsumer;
import pro.taskana.common.internal.util.Pair;
import pro.taskana.common.test.util.ParallelThreadHelper;
import pro.taskana.task.api.CallbackState;
import pro.taskana.task.api.TaskCustomField;
import pro.taskana.task.api.TaskCustomIntField;
Expand All @@ -54,6 +66,7 @@
class TaskQueryImplAccTest {

@TaskanaInject TaskService taskService;
@TaskanaInject InternalTaskanaEngine internalTaskanaEngine;
@TaskanaInject WorkbasketService workbasketService;
@TaskanaInject CurrentUserContext currentUserContext;
@TaskanaInject ClassificationService classificationService;
Expand Down Expand Up @@ -98,6 +111,104 @@ private void persistPermission(WorkbasketSummary workbasketSummary) throws Excep
.buildAndStore(workbasketService, "businessadmin");
}

@Nested
@TestInstance(Lifecycle.PER_CLASS)
class LockResultsEqualsTest {
private static final Integer LOCK_RESULTS_EQUALS = 2;
WorkbasketSummary wb1;
TaskSummary taskSummary1;
TaskSummary taskSummary2;
TaskSummary taskSummary3;
TaskSummary taskSummary4;

@WithAccessId(user = "user-1-1")
@BeforeAll
void setup() throws Exception {
wb1 = createWorkbasketWithPermission();

taskSummary1 = taskInWorkbasket(wb1).state(TaskState.READY)
.buildAndStoreAsSummary(taskService);
taskSummary2 = taskInWorkbasket(wb1).state(TaskState.READY)
.buildAndStoreAsSummary(taskService);
taskSummary3 =
taskInWorkbasket(wb1).state(TaskState.READY)
.buildAndStoreAsSummary(taskService);
taskSummary4 = taskInWorkbasket(wb1).state(TaskState.READY)
.buildAndStoreAsSummary(taskService);

}

@Test
void should_ReturnDifferentTasks_For_LockResultsEqualsTwo() throws Exception {
if (System.getenv("DB") != null && (System.getenv("DB").equals("POSTGRES")
|| System.getenv("DB").equals("DB2"))) {

List<TaskSummary> returnedTasks = Collections.synchronizedList(new ArrayList<>());
List<String> accessIds =
Collections.synchronizedList(
Stream.of("admin", "admin")
.collect(Collectors.toList()));

ParallelThreadHelper.runInThread(
getRunnableTest(returnedTasks, accessIds), accessIds.size());

assertThat(returnedTasks)
.extracting(TaskSummary::getId)
.containsExactlyInAnyOrder(
taskSummary1.getId(), taskSummary2.getId(), taskSummary3.getId(),
taskSummary4.getId());
}
}

@Test
void should_ThrowException_When_UsingLockResultsWithSelectAndClaim() {
ThrowingCallable call = () -> taskService.selectAndClaim(taskService
.createTaskQuery()
.workbasketIdIn(wb1.getId())
.stateIn(TaskState.READY)
.lockResultsEquals(LOCK_RESULTS_EQUALS));
IllegalArgumentException e = catchThrowableOfType(IllegalArgumentException.class, call);
assertThat(e).isNotNull();
assertThat(e.getMessage()).isEqualTo("The params \"lockResultsEquals\" and "
+ "\"selectAndClaim\" cannot be used together!");
}

private Runnable getRunnableTest(List<TaskSummary> listedTasks, List<String> accessIds) {
return () -> {
Subject subject = new Subject();
subject.getPrincipals().add(new UserPrincipal(accessIds.remove(0)));

Consumer<TaskService> consumer =
CheckedConsumer.wrap(
taskService -> {
internalTaskanaEngine.executeInDatabaseConnection(() -> {
List<TaskSummary> results = taskService
.createTaskQuery()
.workbasketIdIn(wb1.getId())
.stateIn(TaskState.READY)
.lockResultsEquals(LOCK_RESULTS_EQUALS).list();
listedTasks.addAll(results);
for (TaskSummary task : results) {
try {
taskService.claim(task.getId());
} catch (Exception e) {
throw new SystemException(e.getMessage());
}
}
});
});


PrivilegedAction<Void> action =
() -> {
consumer.accept(taskService);
return null;
};
Subject.doAs(subject, action);
};
}
}

@Nested
@TestInstance(Lifecycle.PER_CLASS)
class PermissionsTest {
Expand Down
11 changes: 11 additions & 0 deletions lib/taskana-core/src/main/java/pro/taskana/task/api/TaskQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -1905,4 +1905,15 @@ TaskQuery orderByCustomIntAttribute(
* @return the query
*/
TaskQuery orderByWorkbasketName(SortDirection sortDirection);

/**
* This method locks the returned rows until the end of the transaction using the FOR UPDATE lock.
* It cannot be used together with selectAndClaim.
*
* @param lockResults determines the number of returned and locked results;
* if zero, no results are locked, but the number of returned results is not
* limited
* @return the query
*/
TaskQuery lockResultsEquals(Integer lockResults);
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ Task forceClaim(String taskId)

/**
* Selects and claims the first {@linkplain Task} which is returned by the {@linkplain TaskQuery}.
* It cannot be used together with the {@linkplain TaskQuery#lockResultsEquals(Integer)}
* parameter of the query.
*
* @param taskQuery the {@linkplain TaskQuery}
* @return the {@linkplain Task} that got selected and claimed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ public class TaskQueryImpl implements TaskQuery {
private CallbackState[] callbackStateNotIn;
private WildcardSearchField[] wildcardSearchFieldIn;
private String wildcardSearchValueLike;
private Integer lockResults;

TaskQueryImpl(InternalTaskanaEngine taskanaEngine) {
this.taskanaEngine = taskanaEngine;
Expand All @@ -345,6 +346,7 @@ public class TaskQueryImpl implements TaskQuery {
this.orderByInner = new ArrayList<>();
this.filterByAccessIdIn = true;
this.withoutAttachment = false;
this.lockResults = 0;
this.joinWithUserInfo = taskanaEngine.getEngine().getConfiguration().isAddAdditionalUserInfo();
}

Expand Down Expand Up @@ -2076,6 +2078,7 @@ public TaskSummary single() {
TaskSummary result;
try {
taskanaEngine.openConnection();
checkForIllegalParamCombinations();
checkOpenReadAndReadTasksPermissionForSpecifiedWorkbaskets();
setupAccessIds();
setupJoinAndOrderParameters();
Expand Down Expand Up @@ -2116,11 +2119,17 @@ public TaskQuery selectAndClaimEquals(boolean selectAndClaim) {
return this;
}

public TaskQuery lockResultsEquals(Integer lockResults) {
this.lockResults = lockResults;
return this;
}

// optimized query for db2 can't be used for now in case of selectAndClaim because of temporary
// tables and the "for update" clause clashing in db2
private String getLinkToMapperScript() {
if (DB.DB2 == getDB()
&& !selectAndClaim
&& lockResults == 0
&& taskanaEngine.getEngine().getConfiguration().isUseSpecificDb2Taskquery()) {
return LINK_TO_MAPPER_DB2;
} else if (selectAndClaim && DB.ORACLE == getDB()) {
Expand Down Expand Up @@ -2159,6 +2168,11 @@ private void checkForIllegalParamCombinations() {
"The params \"wildcardSearchFieldIn\" and \"wildcardSearchValueLike\""
+ " must be used together!");
}
if (selectAndClaim && lockResults != null && lockResults != 0) {
throw new IllegalArgumentException(
"The params \"lockResultsEquals\" and \"selectAndClaim\""
+ " cannot be used together!");
}
if (withoutAttachment
&& (attachmentChannelIn != null
|| attachmentChannelLike != null
Expand Down Expand Up @@ -2810,8 +2824,8 @@ public String toString() {
+ Arrays.toString(wildcardSearchFieldIn)
+ ", wildcardSearchValueLike="
+ wildcardSearchValueLike
+ ", joinWithUserInfo="
+ joinWithUserInfo
+ ", lockResults="
+ lockResults
+ "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,18 @@ public static String queryTaskSummaries() {
+ "<if test='selectAndClaim == true'> "
+ "FETCH FIRST ROW ONLY FOR UPDATE "
+ "</if>"
+ "<if test=\"_databaseId == 'db2' and selectAndClaim \">WITH RS USE "
+ "<if test='lockResults and lockResults != 0'> "
+ "FETCH FIRST ${lockResults} ROWS ONLY FOR UPDATE "
+ "<if test=\"_databaseId == 'postgres'\">"
+ "SKIP LOCKED "
+ "</if>"
+ "<if test=\"_databaseId == 'db2'\">"
+ "SKIP LOCKED DATA "
+ "</if>"
+ "</if>"
+ "<if test=\"_databaseId == 'db2' and (selectAndClaim or lockResults != 0) \">WITH RS USE "
+ "AND KEEP UPDATE LOCKS </if>"
+ "<if test=\"_databaseId == 'db2' and !selectAndClaim \">WITH UR </if>"
+ "<if test=\"_databaseId == 'db2' and !selectAndClaim and lockResults==0 \">WITH UR </if>"
+ CLOSING_SCRIPT_TAG;
}

Expand Down Expand Up @@ -143,10 +152,7 @@ public static String queryTaskSummariesDb2() {
+ "<if test='!orderByOuter.isEmpty()'>"
+ "ORDER BY <foreach item='item' collection='orderByOuter' separator=',' >${item}</foreach>"
+ "</if> "
+ "<if test='selectAndClaim == true'>"
+ "FETCH FIRST ROW ONLY FOR UPDATE WITH RS USE AND KEEP UPDATE LOCKS"
+ "</if>"
+ "<if test='selectAndClaim == false'> with UR</if>"
+ "with UR "
+ CLOSING_SCRIPT_TAG;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,8 @@ public Optional<Task> selectAndClaim(TaskQuery taskQuery)
Optional.ofNullable(taskQuery.single())
.map(TaskSummary::getId)
.map(wrap(this::claim)));
} catch (IllegalArgumentException e) {
throw e;
} catch (Exception e) {
return Optional.empty();
}
Expand Down

0 comments on commit ef05fc1

Please sign in to comment.