Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.67.x-blue] [JBPM-10088] Removing timers when rollback #2371

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.drools.core.time.TimerService;
import org.drools.core.time.impl.TimerJobInstance;
import org.jbpm.process.core.timer.impl.GlobalTimerService.GlobalJobHandle;
import org.kie.internal.runtime.manager.RuntimeEnvironment;

/**
* Implementations of these interface are responsible for scheduled jobs in global manner,
Expand Down Expand Up @@ -87,4 +88,6 @@ default void invalidate(JobHandle jobHandle) {
default TimerJobInstance getTimerJobInstance(long processInstanceId, long timerId) {
return null;
}

default void setEnvironment(RuntimeEnvironment environment) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public DefaultRuntimeEnvironment(EntityManagerFactory emf, GlobalSchedulerServic
this.usePersistence = true;
this.userGroupCallback = UserDataServiceProvider.getUserGroupCallback();
this.userInfo = UserDataServiceProvider.getUserInfo();
if (globalSchedulerService != null) {
globalSchedulerService.setEnvironment(this);
}
}

public DefaultRuntimeEnvironment(EntityManagerFactory emf, boolean usePersistence) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import javax.ejb.Timer;
import javax.ejb.TimerConfig;
import javax.ejb.TimerHandle;
import javax.ejb.TimerService;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

Expand Down Expand Up @@ -73,7 +74,7 @@ public class EJBTimerScheduler {
private ConcurrentMap<String, TimerJobInstance> localCache = new ConcurrentHashMap<String, TimerJobInstance>();

@Resource
protected javax.ejb.TimerService timerService;
protected TimerService timerService;

@Resource
protected SessionContext ctx;
Expand Down Expand Up @@ -111,7 +112,7 @@ public void executeTimerJob(Timer timer) {
Thread.currentThread().interrupt();
}
try {
invokeTransaction(this::executeTimerJobInstance, timerJobInstance);
executeTimerJobInstance(timerJobInstance);
} catch (Exception e) {
recoverTimerJobInstance(timerJob, timer, e);
}
Expand Down Expand Up @@ -186,7 +187,12 @@ private interface Transaction<I> {

@TransactionAttribute(value = TransactionAttributeType.REQUIRES_NEW)
public <I> void transaction(Transaction<I> operation, I item) throws Exception {
operation.doWork(item);
try {
operation.doWork(item);
} catch (Exception transactionEx) {
ctx.setRollbackOnly();
throw transactionEx;
}
}

private <I> void invokeTransaction (Transaction<I> operation, I item) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,49 +28,42 @@
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;

import org.drools.core.time.InternalSchedulerService;
import org.drools.core.time.Job;
import org.drools.core.time.JobContext;
import org.drools.core.time.JobHandle;
import org.drools.core.time.TimerService;
import org.drools.core.time.Trigger;
import org.drools.core.time.impl.TimerJobInstance;
import org.drools.persistence.api.TransactionManager;
import org.drools.persistence.api.TransactionManagerFactory;
import org.drools.persistence.jta.JtaTransactionManager;
import org.jbpm.process.core.timer.GlobalSchedulerService;
import org.jbpm.process.core.timer.JobNameHelper;
import org.jbpm.process.core.timer.NamedJobContext;
import org.jbpm.process.core.timer.SchedulerServiceInterceptor;
import org.jbpm.process.core.timer.impl.DelegateSchedulerServiceInterceptor;
import org.jbpm.process.core.timer.impl.GlobalTimerService;
import org.jbpm.process.core.timer.impl.GlobalTimerService.GlobalJobHandle;
import org.jbpm.runtime.manager.impl.SimpleRuntimeEnvironment;
import org.jbpm.runtime.manager.impl.jpa.EntityManagerFactoryManager;
import org.jbpm.runtime.manager.impl.jpa.TimerMappingInfo;
import org.kie.internal.runtime.manager.InternalRuntimeManager;
import org.kie.internal.runtime.manager.RuntimeEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class EjbSchedulerService implements GlobalSchedulerService {
private static final Logger logger = LoggerFactory.getLogger(EjbSchedulerService.class);

private static final Boolean TRANSACTIONAL = Boolean.parseBoolean(System.getProperty("org.jbpm.ejb.timer.tx", "true"));

private AtomicLong idCounter = new AtomicLong();
private TimerService globalTimerService;
private GlobalTimerService globalTimerService;
private EJBTimerScheduler scheduler;

private SchedulerServiceInterceptor interceptor = new DelegateSchedulerServiceInterceptor(this);


@Override
public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {
long id = idCounter.getAndIncrement();
String jobName = getJobName(ctx, id);
EjbGlobalJobHandle jobHandle = new EjbGlobalJobHandle(id, jobName, ((GlobalTimerService) globalTimerService).getTimerServiceId());
EjbGlobalJobHandle jobHandle = new EjbGlobalJobHandle(id, jobName, globalTimerService.getTimerServiceId());

TimerJobInstance jobInstance = null;
// check if given timer job is marked as new timer meaning it was never scheduled before,
Expand All @@ -89,7 +82,7 @@ public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {
ctx,
trigger,
jobHandle,
(InternalSchedulerService) globalTimerService);
globalTimerService);

jobHandle.setTimerJobInstance((TimerJobInstance) jobInstance);
interceptor.internalSchedule(jobInstance);
Expand All @@ -100,10 +93,6 @@ public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) {
public boolean removeJob(JobHandle jobHandle) {
String uuid = ((EjbGlobalJobHandle) jobHandle).getUuid();
final Timer ejbTimer = getEjbTimer(getTimerMappinInfo(uuid));
if (TRANSACTIONAL && ejbTimer == null) {
logger.warn("EJB timer is null for uuid {} and transactional flag is enabled", uuid);
return false;
}
boolean result = scheduler.removeJob(jobHandle, ejbTimer);
logger.debug("Remove job returned {}", result);
return result;
Expand All @@ -113,7 +102,6 @@ private TimerJobInstance getTimerJobInstance (String uuid) {
return unwrapTimerJobInstance(getEjbTimer(getTimerMappinInfo(uuid)));
}


@Override
public TimerJobInstance getTimerJobInstance(long processInstanceId, long timerId) {
return unwrapTimerJobInstance(getEjbTimer(getTimerMappinInfo(processInstanceId, timerId)));
Expand All @@ -124,10 +112,9 @@ private Timer getEjbTimer(TimerMappingInfo timerMappingInfo) {
if(timerMappingInfo == null || timerMappingInfo.getInfo() == null) {
return null;
}
byte[] data = timerMappingInfo.getInfo();
return ((TimerHandle) new ObjectInputStream(new ByteArrayInputStream(data)).readObject()).getTimer();
return ((TimerHandle) new ObjectInputStream(new ByteArrayInputStream(timerMappingInfo.getInfo())).readObject()).getTimer();
} catch (Exception e) {
logger.warn("wast not able to deserialize info field from timer info for uuid");
logger.warn("Problem retrieving timer for uuid {}", timerMappingInfo.getUuid(), e);
return null;
}
}
Expand All @@ -146,30 +133,17 @@ private TimerMappingInfo getTimerMappinInfo(long processInstanceId, long timerId
}

private TimerMappingInfo getTimerMappingInfo(Function<EntityManager, List<TimerMappingInfo>> func) {
InternalRuntimeManager manager = ((GlobalTimerService) globalTimerService).getRuntimeManager();
String pu = ((InternalRuntimeManager) manager).getDeploymentDescriptor().getPersistenceUnit();
EntityManagerFactory emf = EntityManagerFactoryManager.get().getOrCreate(pu);
EntityManager em = emf.createEntityManager();
JtaTransactionManager tm = (JtaTransactionManager) TransactionManagerFactory.get().newTransactionManager();
boolean txOwner = false;
EntityManager em = EntityManagerFactoryManager.get()
.getOrCreate(globalTimerService.getRuntimeManager()
.getDeploymentDescriptor().getPersistenceUnit())
.createEntityManager();
try {
if (tm != null && tm.getStatus() == TransactionManager.STATUS_ROLLEDBACK) {
txOwner = tm.begin();
}
List<TimerMappingInfo> info = func.apply(em);
if (!info.isEmpty()) {
return info.get(0);
} else {
return null;
}

return !info.isEmpty() ? info.get(0) : null;
} catch (Exception ex) {
logger.warn("Error getting mapping info ",ex);
return null;
} finally {
if (tm != null) {
tm.commit(txOwner);
}
em.close();
}
}
Expand Down Expand Up @@ -200,7 +174,7 @@ public void internalSchedule(TimerJobInstance timerJobInstance) {

@Override
public void initScheduler(TimerService timerService) {
this.globalTimerService = timerService;
this.globalTimerService = (GlobalTimerService)timerService;
try {
this.scheduler = InitialContext.doLookup("java:module/EJBTimerScheduler");
} catch (NamingException e) {
Expand All @@ -211,18 +185,17 @@ public void initScheduler(TimerService timerService) {
@Override
public void shutdown() {
// managed by container - no op

}

@Override
public JobHandle buildJobHandleForContext(NamedJobContext ctx) {

return new EjbGlobalJobHandle(-1, getJobName(ctx, -1L), ((GlobalTimerService) globalTimerService).getTimerServiceId());
return new EjbGlobalJobHandle(-1, getJobName(ctx, -1L), globalTimerService.getTimerServiceId());
}

@Override
public boolean isTransactional() {
return TRANSACTIONAL;
return true;
}

@Override
Expand All @@ -237,11 +210,18 @@ public void setInterceptor(SchedulerServiceInterceptor interceptor) {

@Override
public boolean isValid(GlobalJobHandle jobHandle) {

return true;
return true;
}

protected String getJobName(JobContext ctx, long id) {
return JobNameHelper.getJobName(ctx, id);
}

@Override
public
void setEnvironment(RuntimeEnvironment environment) {
if (environment instanceof SimpleRuntimeEnvironment) {
((SimpleRuntimeEnvironment)environment).addToEnvironment("IS_TIMER_CMT", true);
}
}
}
Loading