Skip to content

Commit

Permalink
[JBPM-10088] Removing timers when rollback
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Nov 16, 2023
1 parent ababa5f commit 27c29b1
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import javax.ejb.Timer;
import javax.ejb.TimerConfig;
import javax.ejb.TimerHandle;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

import org.drools.core.time.JobHandle;
import org.drools.core.time.impl.TimerJobInstance;
Expand Down Expand Up @@ -111,60 +109,49 @@ public void executeTimerJob(Timer timer) {
Thread.currentThread().interrupt();
}
try {
invokeTransaction(this::executeTimerJobInstance, timerJobInstance);
((Callable<?>) timerJobInstance).call();
} catch (Exception e) {
recoverTimerJobInstance(timerJob, timer, e);
}
}

private void executeTimerJobInstance(TimerJobInstance timerJobInstance) throws Exception {
((Callable<?>) timerJobInstance).call();
}

private void recoverTimerJobInstance(EjbTimerJob ejbTimerJob, Timer timer, Exception cause) {
Transaction<TimerJobInstance> tx;

TimerJobInstance timerJobInstance = ejbTimerJob.getTimerJobInstance();
if (isSessionNotFound(cause)) {
// if session is not found means the process has already finished. In this case we just need to remove
// the timer and avoid any recovery as it should not trigger any more timers.
tx = timerJobInstance -> {
logger.warn("Trying to recover timer. Not possible due to process instance is not found. More likely already completed. Timer {} won't be recovered", timerJobInstance, cause);
if (!removeJob(timerJobInstance.getJobHandle(), timer)) {
logger.warn("Trying to recover timer. Not possible due to process instance is not found. More likely already completed. Timer {} won't be recovered", timerJobInstance, cause);
if (!removeJob(timerJobInstance.getJobHandle(), timer)) {
logger.warn("Session not found for timer {}. Timer could not removed.", timerJobInstance);
}
};
}
}
else if (ejbTimerJob.getTimerJobInstance().getTrigger().hasNextFireTime() != null) {
else if (timerJobInstance.getTrigger().hasNextFireTime() != null) {
// this is an interval trigger. Problem here is that the timer scheduled by DefaultTimerJobInstance is lost
// because of the transaction, so we need to do this here.
tx = timerJobInstance -> {
logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", timerJobInstance);
if (removeJob(timerJobInstance.getJobHandle(), null)) {
internalSchedule(timerJobInstance);
} else {

logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", timerJobInstance);
if (removeJob(timerJobInstance.getJobHandle(), null)) {
internalSchedule(timerJobInstance);
} else {
logger.debug("Interval trigger {} was removed before rescheduling", timerJobInstance);
}
};
}

}
else {
// if there is not next date to be fired, we need to apply policy otherwise will be lost
tx = timerJobInstance -> {
logger.warn("Execution of time failed. The timer will be retried {}", timerJobInstance);
ZonedDateTime nextRetry = ZonedDateTime.now().plus(TIMER_RETRY_INTERVAL, ChronoUnit.MILLIS);
EjbTimerJobRetry info = ejbTimerJob instanceof EjbTimerJobRetry ? ((EjbTimerJobRetry) ejbTimerJob).next() : new EjbTimerJobRetry(timerJobInstance);
if (TIMER_RETRY_LIMIT > 0 && info.getRetry() > TIMER_RETRY_LIMIT) {
logger.warn("The timer {} reached retry limit {}. It won't be retried again", timerJobInstance, TIMER_RETRY_LIMIT);
} else {
TimerConfig config = new TimerConfig(info, true);
Timer newTimer = timerService.createSingleActionTimer(Date.from(nextRetry.toInstant()), config);
((GlobalJpaTimerJobInstance) timerJobInstance).setTimerInfo(newTimer.getHandle());
((GlobalJpaTimerJobInstance) timerJobInstance).setExternalTimerId(getPlatformTimerId(newTimer));
}
};
}
try {
invokeTransaction (tx, ejbTimerJob.getTimerJobInstance());
} catch (Exception e) {
logger.error("Failed to executed timer recovery", e);
logger.warn("Execution of time failed. The timer will be retried {}", timerJobInstance);
ZonedDateTime nextRetry = ZonedDateTime.now().plus(TIMER_RETRY_INTERVAL, ChronoUnit.MILLIS);
EjbTimerJobRetry info = ejbTimerJob instanceof EjbTimerJobRetry ? ((EjbTimerJobRetry) ejbTimerJob).next() : new EjbTimerJobRetry(timerJobInstance);
if (TIMER_RETRY_LIMIT > 0 && info.getRetry() > TIMER_RETRY_LIMIT) {
logger.warn("The timer {} reached retry limit {}. It won't be retried again", timerJobInstance, TIMER_RETRY_LIMIT);
} else {
TimerConfig config = new TimerConfig(info, true);
Timer newTimer = timerService.createSingleActionTimer(Date.from(nextRetry.toInstant()), config);
((GlobalJpaTimerJobInstance) timerJobInstance).setTimerInfo(newTimer.getHandle());
((GlobalJpaTimerJobInstance) timerJobInstance).setExternalTimerId(getPlatformTimerId(newTimer));
}
}
}

Expand All @@ -179,20 +166,6 @@ private boolean isSessionNotFound(Exception e) {
return false;
}

@FunctionalInterface
private interface Transaction<I> {
void doWork(I item) throws Exception;
}

@TransactionAttribute(value = TransactionAttributeType.REQUIRES_NEW)
public <I> void transaction(Transaction<I> operation, I item) throws Exception {
operation.doWork(item);
}

private <I> void invokeTransaction (Transaction<I> operation, I item) throws Exception {
ctx.getBusinessObject(EJBTimerScheduler.class).transaction(operation,item);
}

public void internalSchedule(TimerJobInstance timerJobInstance) {
Serializable info = removeTransientFields(new EjbTimerJob(timerJobInstance));
TimerConfig config = new TimerConfig(info, true);
Expand All @@ -218,7 +191,7 @@ private String getPlatformTimerId(Timer timer) {
Method method = timer.getClass().getMethod("getId");
return (String) method.invoke(timer);
} catch (Exception timerIdException) {
logger.trace("Failed to get the platform timer id {}", timerIdException.getMessage(), timerIdException);
logger.trace("Failed to get the platform timer id", timerIdException);
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,6 @@ public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {
public boolean removeJob(JobHandle jobHandle) {
String uuid = ((EjbGlobalJobHandle) jobHandle).getUuid();
final Timer ejbTimer = getEjbTimer(getTimerMappinInfo(uuid));
if (TRANSACTIONAL && ejbTimer == null) {
logger.warn("EJB timer is null for uuid {} and transactional flag is enabled", uuid);
return false;
}
boolean result = scheduler.removeJob(jobHandle, ejbTimer);
logger.debug("Remove job returned {}", result);
return result;
Expand All @@ -127,7 +123,7 @@ private Timer getEjbTimer(TimerMappingInfo timerMappingInfo) {
byte[] data = timerMappingInfo.getInfo();
return ((TimerHandle) new ObjectInputStream(new ByteArrayInputStream(data)).readObject()).getTimer();
} catch (Exception e) {
logger.warn("wast not able to deserialize info field from timer info for uuid");
logger.warn("Problem retrieving timer for uuid {}", timerMappingInfo.getUuid(), e);
return null;
}
}
Expand Down

0 comments on commit 27c29b1

Please sign in to comment.