Skip to content

Commit

Permalink
[JBPM-10209] Switching to CMT (#2362)
Browse files Browse the repository at this point in the history
* [JBPM-10209] Switching to CMT

* [JBPM-10209] A bit of refactor

* [JBPM-10209] Gonzalos comments

* [JBPM-10209] Disposable handling
  • Loading branch information
fjtirado authored Nov 16, 2023
1 parent f897fcc commit ababa5f
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 154 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.jbpm.process.core;

import org.kie.internal.runtime.manager.Disposable;
import org.kie.internal.runtime.manager.InternalRuntimeEngine;

public interface DisposableRuntimeEngine extends InternalRuntimeEngine, Disposable {
boolean isDisposed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.drools.core.time.impl.DefaultJobHandle;
import org.drools.core.time.impl.TimerJobFactoryManager;
import org.drools.core.time.impl.TimerJobInstance;
import org.jbpm.process.core.DisposableRuntimeEngine;
import org.jbpm.process.core.timer.GlobalSchedulerService;
import org.jbpm.process.core.timer.NamedJobContext;
import org.jbpm.process.instance.timer.TimerManager.ProcessJobContext;
Expand Down Expand Up @@ -420,7 +421,10 @@ public Environment getEnvironment() {

return runtime.getKieSession().getEnvironment();
}


public boolean isDisposed() {
return runtime instanceof DisposableRuntimeEngine && ((DisposableRuntimeEngine)runtime).isDisposed();
}
}

public List<TimerServiceListener> getListeners() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public GlobalJpaTimerJobInstance(Job job, JobContext ctx, Trigger trigger,
@Override
public Void call() throws Exception {
AsyncExecutionMarker.markAsync();
ExecutableRunner runner = null;
ExecutableRunner<?> runner = null;
TransactionManager jtaTm = null;
boolean success = false;
try {
Expand All @@ -88,16 +88,16 @@ public Void call() throws Exception {
success = true;
return null;
} catch( Exception e ) {
e.printStackTrace();
logger.error("Exception executing timer", e);
success = false;
throw e;
} finally {
AsyncExecutionMarker.reset();
if (runner != null && runner instanceof DisposableCommandService) {
if (allowedToDispose(((DisposableCommandService) runner).getEnvironment())) {
logger.debug("Allowed to dispose command service from global timer job instance");
((DisposableCommandService) runner).dispose();
}
if (runner != null && runner instanceof DisposableCommandService) {
if (allowedToDispose(((DisposableCommandService) runner))) {
logger.debug("Allowed to dispose command service from global timer job instance");
((DisposableCommandService) runner).dispose();
}
}
closeTansactionIfNeeded(jtaTm, success);
}
Expand All @@ -123,7 +123,11 @@ public String toString() {
return "GlobalJpaTimerJobInstance [timerServiceId=" + timerServiceId
+ ", getJobHandle()=" + getJobHandle() + "]";
}


private boolean allowedToDispose(DisposableCommandService disposableCommandService) {
return !disposableCommandService.isDisposed() && allowedToDispose (disposableCommandService.getEnvironment());
}

protected boolean allowedToDispose(Environment environment) {
if (hasEnvironmentEntry(environment, "IS_JTA_TRANSACTION", false)) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
import java.util.concurrent.CopyOnWriteArrayList;

import org.jbpm.process.audit.JPAAuditLogService;
import org.jbpm.process.core.DisposableRuntimeEngine;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.manager.Context;
import org.kie.api.runtime.manager.RuntimeManager;
import org.kie.api.runtime.manager.audit.AuditService;
import org.kie.api.task.TaskService;
import org.kie.internal.runtime.manager.Disposable;
import org.kie.internal.runtime.manager.DisposeListener;
import org.kie.internal.runtime.manager.InternalRuntimeEngine;
import org.kie.internal.runtime.manager.InternalRuntimeManager;
import org.kie.internal.runtime.manager.SessionNotFoundException;

Expand All @@ -36,7 +35,7 @@
* and work item handlers might be interested in receiving notification when the runtime engine is disposed of,
* in order deactivate themselves too and not receive any other events.
*/
public class RuntimeEngineImpl implements InternalRuntimeEngine, Disposable {
public class RuntimeEngineImpl implements DisposableRuntimeEngine {

private RuntimeEngineInitlializer initializer;
private Context<?> context;
Expand Down Expand Up @@ -143,6 +142,7 @@ public void setManager(RuntimeManager manager) {
this.manager = manager;
}

@Override
public boolean isDisposed() {
return disposed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,18 @@

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.ejb.ConcurrencyManagement;
import javax.ejb.ConcurrencyManagementType;
import javax.ejb.Lock;
import javax.ejb.LockType;
import javax.ejb.NoSuchObjectLocalException;
import javax.ejb.SessionContext;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.ejb.Timeout;
import javax.ejb.Timer;
import javax.ejb.TimerConfig;
import javax.ejb.TimerHandle;
import javax.ejb.TransactionManagement;
import javax.ejb.TransactionManagementType;
import javax.transaction.RollbackException;
import javax.transaction.UserTransaction;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

import org.drools.core.time.JobHandle;
import org.drools.core.time.impl.TimerJobInstance;
Expand All @@ -58,8 +55,6 @@

@Singleton
@Startup
@ConcurrencyManagement(ConcurrencyManagementType.CONTAINER)
@TransactionManagement(TransactionManagementType.BEAN)
@Lock(LockType.READ)
public class EJBTimerScheduler {

Expand All @@ -70,17 +65,19 @@ public class EJBTimerScheduler {
private static final Integer TIMER_RETRY_LIMIT = Integer.parseInt(System.getProperty("org.kie.jbpm.timer.retry.limit", "3"));

private static final Integer OVERDUE_WAIT_TIME = Integer.parseInt(System.getProperty("org.jbpm.overdue.timer.wait", "20000"));

private static final Integer OVERDUE_CHECK_TIME = Integer.parseInt(System.getProperty("org.jbpm.overdue.timer.check", "200"));

private boolean useLocalCache = Boolean.parseBoolean(System.getProperty("org.jbpm.ejb.timer.local.cache", "false"));

private ConcurrentMap<String, TimerJobInstance> localCache = new ConcurrentHashMap<String, TimerJobInstance>();

@Resource
protected javax.ejb.TimerService timerService;

@Resource
protected UserTransaction utx;

protected SessionContext ctx;
public void setUseLocalCache(boolean useLocalCache) {
this.useLocalCache = useLocalCache;
}
Expand All @@ -97,112 +94,78 @@ public void executeTimerJob(Timer timer) {
EjbTimerJob timerJob = (EjbTimerJob) timer.getInfo();
TimerJobInstance timerJobInstance = timerJob.getTimerJobInstance();
logger.debug("About to execute timer for job {}", timerJob);

String timerServiceId = ((EjbGlobalJobHandle) timerJobInstance.getJobHandle()).getDeploymentId();

// handle overdue timers as ejb timer service might start before all deployments are ready
long time = 0;
while (TimerServiceRegistry.getInstance().get(timerServiceId) == null) {
logger.debug("waiting for timer service to be available, elapsed time {} ms", time);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
time += 500;

if (time > OVERDUE_WAIT_TIME) {
logger.debug("No timer service found after waiting {} ms", time);
break;
try {
while (TimerServiceRegistry.getInstance().get(((EjbGlobalJobHandle) timerJobInstance.getJobHandle()).getDeploymentId()) == null) {
logger.debug("waiting for timer service to be available, elapsed time {} ms", time);
Thread.sleep(OVERDUE_CHECK_TIME);
time += OVERDUE_CHECK_TIME;
if (time > OVERDUE_WAIT_TIME) {
logger.debug("No timer service found after waiting {} ms", time);
break;
}
}
}
} catch (InterruptedException e) {
logger.warn("Thread has been interrupted", e);
Thread.currentThread().interrupt();
}
try {
transaction(this::executeTimerJobInstance, timerJobInstance);
} catch (SessionNotFoundException e) {
logger.warn("Process instance is not found. More likely already completed. Timer {} won't be recovered", timerJobInstance, e);
removeUnrecoverableTimer(timerJob, timer);
invokeTransaction(this::executeTimerJobInstance, timerJobInstance);
} catch (Exception e) {
recoverTimerJobInstance(timerJob, timer, e);
}
}

private void executeTimerJobInstance(TimerJobInstance timerJobInstance) throws Exception {
try {
((Callable<?>) timerJobInstance).call();
} catch (Exception e) {
throw e;
}
((Callable<?>) timerJobInstance).call();
}

private void removeUnrecoverableTimer(EjbTimerJob ejbTimerJob, Timer timer) {
try {
Transaction<TimerJobInstance> tx = timerJobInstance -> {
if (!this.removeJob(timerJobInstance.getJobHandle(), timer)) {
logger.warn("Session not found for timer {}. Timer could not removed.", ejbTimerJob.getTimerJobInstance());
}
};
transaction(tx, ejbTimerJob.getTimerJobInstance());
} catch (Exception e1) {
logger.warn("There was a problem during timer removal {}", ejbTimerJob.getTimerJobInstance(), e1);
}
}

private void recoverTimerJobInstance(EjbTimerJob ejbTimerJob, Timer timer, Exception e) {
if (isSessionNotFound(e)) {
logger.warn("Trying to recover timer. Not possible due to process instance is not found. More likely already completed. Timer {} won't be recovered", ejbTimerJob.getTimerJobInstance(), e);
private void recoverTimerJobInstance(EjbTimerJob ejbTimerJob, Timer timer, Exception cause) {
Transaction<TimerJobInstance> tx;
if (isSessionNotFound(cause)) {
// if session is not found means the process has already finished. In this case we just need to remove
// the timer and avoid any recovery as it should not trigger any more timers.
removeUnrecoverableTimer(ejbTimerJob, timer);
return;
tx = timerJobInstance -> {
logger.warn("Trying to recover timer. Not possible due to process instance is not found. More likely already completed. Timer {} won't be recovered", timerJobInstance, cause);
if (!removeJob(timerJobInstance.getJobHandle(), timer)) {
logger.warn("Session not found for timer {}. Timer could not removed.", timerJobInstance);
}
};
}

if (ejbTimerJob.getTimerJobInstance().getTrigger().hasNextFireTime() != null) {
else if (ejbTimerJob.getTimerJobInstance().getTrigger().hasNextFireTime() != null) {
// this is an interval trigger. Problem here is that the timer scheduled by DefaultTimerJobInstance is lost
// because of the transaction, so we need to do this here.
try {

logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", ejbTimerJob.getTimerJobInstance());
Transaction<TimerJobInstance> tx = timerJobInstance -> {
if (this.removeJob(timerJobInstance.getJobHandle(), null)) {
this.internalSchedule(timerJobInstance);
} else {
logger.debug("Interval trigger {} was removed before rescheduling", ejbTimerJob.getTimerJobInstance());
}
};
transaction(tx, ejbTimerJob.getTimerJobInstance());
} catch (Exception e1) {
logger.warn("Could not schedule the interval trigger {}", ejbTimerJob.getTimerJobInstance(), e1);
}
return;
tx = timerJobInstance -> {
logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", timerJobInstance);
if (removeJob(timerJobInstance.getJobHandle(), null)) {
internalSchedule(timerJobInstance);
} else {
logger.debug("Interval trigger {} was removed before rescheduling", timerJobInstance);
}
};
}
else {
// if there is not next date to be fired, we need to apply policy otherwise will be lost
tx = timerJobInstance -> {
logger.warn("Execution of time failed. The timer will be retried {}", timerJobInstance);
ZonedDateTime nextRetry = ZonedDateTime.now().plus(TIMER_RETRY_INTERVAL, ChronoUnit.MILLIS);
EjbTimerJobRetry info = ejbTimerJob instanceof EjbTimerJobRetry ? ((EjbTimerJobRetry) ejbTimerJob).next() : new EjbTimerJobRetry(timerJobInstance);
if (TIMER_RETRY_LIMIT > 0 && info.getRetry() > TIMER_RETRY_LIMIT) {
logger.warn("The timer {} reached retry limit {}. It won't be retried again", timerJobInstance, TIMER_RETRY_LIMIT);
} else {
TimerConfig config = new TimerConfig(info, true);
Timer newTimer = timerService.createSingleActionTimer(Date.from(nextRetry.toInstant()), config);
((GlobalJpaTimerJobInstance) timerJobInstance).setTimerInfo(newTimer.getHandle());
((GlobalJpaTimerJobInstance) timerJobInstance).setExternalTimerId(getPlatformTimerId(newTimer));
}
};
}

// if there is not next date to be fired, we need to apply policy otherwise will be lost

logger.warn("Execution of time failed. The timer will be retried {}", ejbTimerJob.getTimerJobInstance());
Transaction<TimerJobInstance> operation = (instance) -> {
ZonedDateTime nextRetry = ZonedDateTime.now().plus(TIMER_RETRY_INTERVAL, ChronoUnit.MILLIS);
EjbTimerJobRetry info = null;
if(ejbTimerJob instanceof EjbTimerJobRetry) {
info = ((EjbTimerJobRetry) ejbTimerJob).next();
} else {
info = new EjbTimerJobRetry(instance);
}
if (TIMER_RETRY_LIMIT > 0 && info.getRetry() > TIMER_RETRY_LIMIT) {
logger.warn("The timer {} reached retry limit {}. It won't be retried again", instance, TIMER_RETRY_LIMIT);
return;
}
TimerConfig config = new TimerConfig(info, true);
Timer newTimer = timerService.createSingleActionTimer(Date.from(nextRetry.toInstant()), config);
TimerHandle handler = newTimer.getHandle();
((GlobalJpaTimerJobInstance) ejbTimerJob.getTimerJobInstance()).setTimerInfo(handler);
((GlobalJpaTimerJobInstance) ejbTimerJob.getTimerJobInstance()).setExternalTimerId(getPlatformTimerId(newTimer));
};
try {
transaction(operation, ejbTimerJob.getTimerJobInstance());
} catch (Exception e1) {
logger.error("Failed to executed timer recovery {}", e1.getMessage(), e1);
invokeTransaction (tx, ejbTimerJob.getTimerJobInstance());
} catch (Exception e) {
logger.error("Failed to executed timer recovery", e);
}

}

private boolean isSessionNotFound(Exception e) {
Expand All @@ -218,30 +181,16 @@ private boolean isSessionNotFound(Exception e) {

@FunctionalInterface
private interface Transaction<I> {

void doWork(I item) throws Exception;
}

private <I> void transaction(Transaction<I> operation, I item) throws Exception {
try {
utx.begin();
operation.doWork(item);
utx.commit();
} catch(RollbackException e) {
logger.warn("Transaction was rolled back for {} with status {}", item, utx.getStatus());
if(utx.getStatus() == javax.transaction.Status.STATUS_ACTIVE) {
utx.rollback();
}
throw new RuntimeException("jbpm timer has been rolledback", e);
} catch (Exception e) {
try {
utx.rollback();
} catch (Exception re) {
logger.error("transaction could not be rolled back", re);
}

throw e;
}
@TransactionAttribute(value = TransactionAttributeType.REQUIRES_NEW)
public <I> void transaction(Transaction<I> operation, I item) throws Exception {
operation.doWork(item);
}

private <I> void invokeTransaction (Transaction<I> operation, I item) throws Exception {
ctx.getBusinessObject(EJBTimerScheduler.class).transaction(operation,item);
}

public void internalSchedule(TimerJobInstance timerJobInstance) {
Expand Down
Loading

0 comments on commit ababa5f

Please sign in to comment.