From 6ed56300662f449a6eeae96277e484c3e5d71629 Mon Sep 17 00:00:00 2001 From: meyerd Date: Thu, 12 Jan 2012 17:16:25 +0000 Subject: [PATCH] ACT-34 merging branch --- .../cfg/ProcessEngineConfigurationImpl.java | 3 +- ...onThread.java => AcquireJobsRunnable.java} | 274 +++++++++--------- .../engine/impl/jobexecutor/AcquiredJobs.java | 4 + .../impl/jobexecutor/DefaultJobExecutor.java | 138 +++++++++ .../engine/impl/jobexecutor/JobExecutor.java | 224 +++++--------- .../engine/test/bpmn/async/AsyncTaskTest.java | 26 +- 6 files changed, 368 insertions(+), 301 deletions(-) rename modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/{JobAcquisitionThread.java => AcquireJobsRunnable.java} (58%) create mode 100644 modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/DefaultJobExecutor.java diff --git a/modules/activiti-engine/src/main/java/org/activiti/engine/impl/cfg/ProcessEngineConfigurationImpl.java b/modules/activiti-engine/src/main/java/org/activiti/engine/impl/cfg/ProcessEngineConfigurationImpl.java index 36eb1b2236..27ddfd1370 100644 --- a/modules/activiti-engine/src/main/java/org/activiti/engine/impl/cfg/ProcessEngineConfigurationImpl.java +++ b/modules/activiti-engine/src/main/java/org/activiti/engine/impl/cfg/ProcessEngineConfigurationImpl.java @@ -81,6 +81,7 @@ import org.activiti.engine.impl.interceptor.CommandInterceptor; import org.activiti.engine.impl.interceptor.DelegateInterceptor; import org.activiti.engine.impl.interceptor.SessionFactory; import org.activiti.engine.impl.jobexecutor.AsyncContinuationJobHandler; +import org.activiti.engine.impl.jobexecutor.DefaultJobExecutor; import org.activiti.engine.impl.jobexecutor.JobExecutor; import org.activiti.engine.impl.jobexecutor.JobHandler; import org.activiti.engine.impl.jobexecutor.TimerCatchIntermediateEventJobHandler; @@ -642,7 +643,7 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig protected void initJobExecutor() { if (jobExecutor==null) { - jobExecutor = new JobExecutor(); + jobExecutor = new DefaultJobExecutor(); } jobHandlers = new HashMap(); diff --git a/modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/JobAcquisitionThread.java b/modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/AcquireJobsRunnable.java similarity index 58% rename from modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/JobAcquisitionThread.java rename to modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/AcquireJobsRunnable.java index 52161403ac..ed910a1f29 100644 --- a/modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/JobAcquisitionThread.java +++ b/modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/AcquireJobsRunnable.java @@ -1,141 +1,133 @@ -/* Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.activiti.engine.impl.jobexecutor; - -import java.util.Date; -import java.util.List; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.activiti.engine.impl.Page; -import org.activiti.engine.impl.cmd.AcquireJobsCmd; -import org.activiti.engine.impl.interceptor.CommandExecutor; -import org.activiti.engine.impl.persistence.entity.TimerEntity; -import org.activiti.engine.impl.util.ClockUtil; - -/** - * Background thread responsible for retrieving the list of Jobs currently - * awaiting processing from the queue, and passing them to the - * {@link JobExecutor} to be run. There should only ever be one of these per - * {@link JobExecutor}. Note that in a clustered Environment, there can be - * multiple of these, so we need locking/transactions to ensure we don't fetch - * Jobs someone else already has. - */ -public class JobAcquisitionThread extends Thread { - - private static Logger log = Logger.getLogger(JobAcquisitionThread.class.getName()); - - private final AcquireJobsCmd acquireJobsCmd; - - private JobExecutor jobExecutor; - private boolean isActive = false; - private boolean isJobAdded = false; - - public JobAcquisitionThread(JobExecutor jobExecutor) { - super("JobAcquisitionThread"); - this.jobExecutor = jobExecutor; - this.acquireJobsCmd = new AcquireJobsCmd(jobExecutor); - } - - public void run() { - log.info(getName() + " starting to acquire jobs"); - this.isActive = true; - - CommandExecutor commandExecutor = jobExecutor.getCommandExecutor(); - long millisToWait = 0; - float waitIncreaseFactor = 2; - long maxWait = 60 * 1000; - - while (isActive) { - int maxJobsPerAcquisition = jobExecutor.getMaxJobsPerAcquisition(); - - try { - AcquiredJobs acquiredJobs = commandExecutor.execute(acquireJobsCmd); - - for (List jobIds : acquiredJobs.getJobIdBatches()) { - jobExecutor.executeJobs(jobIds); - } - - // if all jobs were executed - millisToWait = jobExecutor.getWaitTimeInMillis(); - int jobsAcquired = acquiredJobs.getJobIdBatches().size(); - if (jobsAcquired < maxJobsPerAcquisition) { - - isJobAdded = false; - - // check if the next timer should fire before the normal sleep time is over - Date duedate = new Date(ClockUtil.getCurrentTime().getTime() + millisToWait); - List nextTimers = commandExecutor.execute(new GetUnlockedTimersByDuedateCmd(duedate, new Page(0, 1))); - - if (!nextTimers.isEmpty()) { - long millisTillNextTimer = nextTimers.get(0).getDuedate().getTime() - ClockUtil.getCurrentTime().getTime(); - if (millisTillNextTimer < millisToWait) { - millisToWait = millisTillNextTimer; - } - } - - } else { - millisToWait = 0; - } - - } catch (Exception e) { - log.log(Level.SEVERE, "exception during job acquisition: " + e.getMessage(), e); - millisToWait *= waitIncreaseFactor; - if (millisToWait > maxWait) { - millisToWait = maxWait; - } - } - - if ((millisToWait > 0) && (!isJobAdded)) { - try { - log.fine("job acquisition thread sleeping for " + millisToWait + " millis"); - Thread.sleep(millisToWait); - log.fine("job acquisition thread woke up"); - } catch (InterruptedException e) { - log.fine("job acquisition wait interrupted"); - } - } - } - log.info(getName() + " stopped"); - } - - public void jobWasAdded() { - isJobAdded = true; - log.fine("Job was added. Interrupting " + this); - interrupt(); - } - - /** - * Triggers a shutdown - */ - public void shutdown() { - if (isActive) { - log.info(getName() + " is shutting down"); - isActive = false; - interrupt(); - try { - join(); - } catch (InterruptedException e) { - log.log(Level.WARNING, "Interruption while shutting down " + this.getClass().getName(), e); - } - } - } - - public JobExecutor getJobExecutor() { - return jobExecutor; - } - - public boolean isActive() { - return isActive; - } -} +/* Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.activiti.engine.impl.jobexecutor; + +import java.util.Date; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.activiti.engine.impl.Page; +import org.activiti.engine.impl.interceptor.CommandExecutor; +import org.activiti.engine.impl.persistence.entity.TimerEntity; +import org.activiti.engine.impl.util.ClockUtil; + +/** + * + * @author Daniel Meyer + */ +public class AcquireJobsRunnable implements Runnable { + + private static Logger log = Logger.getLogger(AcquireJobsRunnable.class.getName()); + + protected final JobExecutor jobExecutor; + + protected volatile boolean isInterrupted = false; + protected volatile boolean isJobAdded = false; + protected final Object MONITOR = new Object(); + protected final AtomicBoolean isWaiting = new AtomicBoolean(false); + + public AcquireJobsRunnable(JobExecutor jobExecutor) { + this.jobExecutor = jobExecutor; + } + + public synchronized void run() { + log.info(jobExecutor.getName() + " starting to acquire jobs"); + + final CommandExecutor commandExecutor = jobExecutor.getCommandExecutor(); + long millisToWait = 0; + float waitIncreaseFactor = 2; + long maxWait = 60 * 1000; + + while (!isInterrupted) { + int maxJobsPerAcquisition = jobExecutor.getMaxJobsPerAcquisition(); + + try { + AcquiredJobs acquiredJobs = commandExecutor.execute(jobExecutor.getAcquireJobsCmd()); + + for (List jobIds : acquiredJobs.getJobIdBatches()) { + jobExecutor.executeJobs(jobIds); + } + + // if all jobs were executed + millisToWait = jobExecutor.getWaitTimeInMillis(); + int jobsAcquired = acquiredJobs.getJobIdBatches().size(); + if (jobsAcquired < maxJobsPerAcquisition) { + + isJobAdded = false; + + // check if the next timer should fire before the normal sleep time is over + Date duedate = new Date(ClockUtil.getCurrentTime().getTime() + millisToWait); + List nextTimers = commandExecutor.execute(new GetUnlockedTimersByDuedateCmd(duedate, new Page(0, 1))); + + if (!nextTimers.isEmpty()) { + long millisTillNextTimer = nextTimers.get(0).getDuedate().getTime() - ClockUtil.getCurrentTime().getTime(); + if (millisTillNextTimer < millisToWait) { + millisToWait = millisTillNextTimer; + } + } + + } else { + millisToWait = 0; + } + + } catch (Exception e) { + log.log(Level.SEVERE, "exception during job acquisition: " + e.getMessage(), e); + millisToWait *= waitIncreaseFactor; + if (millisToWait > maxWait) { + millisToWait = maxWait; + } + } + + if ((millisToWait > 0) && (!isJobAdded)) { + try { + log.fine("job acquisition thread sleeping for " + millisToWait + " millis"); + synchronized (MONITOR) { + if(!isInterrupted) { + isWaiting.set(true); + MONITOR.wait(millisToWait); + } + } + log.fine("job acquisition thread woke up"); + } catch (InterruptedException e) { + log.fine("job acquisition wait interrupted"); + } finally { + isWaiting.set(false); + } + } + } + log.info(jobExecutor.getName() + " stopped job acquisition"); + } + + public void stop() { + synchronized (MONITOR) { + isInterrupted = true; + if(isWaiting.compareAndSet(true, false)) { + MONITOR.notifyAll(); + } + } + } + + public void jobWasAdded() { + isJobAdded = true; + if(isWaiting.compareAndSet(true, false)) { + // ensures we only notify once + // I am OK with the race condition + synchronized (MONITOR) { + MONITOR.notifyAll(); + } + } + } + +} diff --git a/modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/AcquiredJobs.java b/modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/AcquiredJobs.java index 6f3cd694dc..e4e3441280 100644 --- a/modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/AcquiredJobs.java +++ b/modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/AcquiredJobs.java @@ -39,6 +39,10 @@ public class AcquiredJobs { public boolean contains(String jobId) { return acquiredJobs.contains(jobId); } + + public int size() { + return acquiredJobs.size(); + } } diff --git a/modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/DefaultJobExecutor.java b/modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/DefaultJobExecutor.java new file mode 100644 index 0000000000..a518ac0ad4 --- /dev/null +++ b/modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/DefaultJobExecutor.java @@ -0,0 +1,138 @@ +/* Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.activiti.engine.impl.jobexecutor; + +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.activiti.engine.ActivitiException; + +/** + *

This is a simple implementation of the {@link JobExecutor} using self-managed + * threads for performing background work.

+ * + *

This implementation uses a {@link ThreadPoolExecutor} backed by a queue to which + * work is submitted.

+ * + *

NOTE: use this class in environments in which self-management of threads + * is permitted. Consider using a different thread-management strategy in + * J(2)EE-Environments.

+ * + * @author Daniel Meyer + */ +public class DefaultJobExecutor extends JobExecutor { + + protected int queueSize = 3; + protected int corePoolSize = 3; + private int maxPoolSize = 10; + + protected Thread jobAcquisitionThread; + protected BlockingQueue threadPoolQueue; + protected ThreadPoolExecutor threadPoolExecutor; + + protected RejectedExecutionHandler rejectedExecutionHandler; + + protected void startExecutingJobs() { + if (threadPoolQueue==null) { + threadPoolQueue = new ArrayBlockingQueue(queueSize); + } + if (threadPoolExecutor==null) { + threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 0L, TimeUnit.MILLISECONDS, threadPoolQueue); + if(rejectedExecutionHandler == null) { + rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); + } + threadPoolExecutor.setRejectedExecutionHandler(rejectedExecutionHandler); + } + if(jobAcquisitionThread == null) { + jobAcquisitionThread = new Thread(acquireJobsRunnable); + jobAcquisitionThread.setDaemon(true); + jobAcquisitionThread.start(); + } + } + + protected void stopExecutingJobs() { + // Ask the thread pool to finish and exit + threadPoolExecutor.shutdown(); + + // Waits for 1 minute to finish all currently executing jobs + try { + threadPoolExecutor.awaitTermination(60L, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new ActivitiException("Timeout during shutdown of job executor. " + + "The current running jobs could not end withing 60 seconds after shutdown operation.", e); + } + + threadPoolExecutor = null; + jobAcquisitionThread = null; + } + + public void executeJobs(List jobIds) { + // TODO: RejectedExecutionException handling! + threadPoolExecutor.execute(new ExecuteJobsRunnable(commandExecutor, jobIds)); + } + + // getters and setters ////////////////////////////////////////////////////// + + public int getQueueSize() { + return queueSize; + } + + public void setQueueSize(int queueSize) { + this.queueSize = queueSize; + } + + public int getCorePoolSize() { + return corePoolSize; + } + + public void setCorePoolSize(int corePoolSize) { + this.corePoolSize = corePoolSize; + } + + public int getMaxPoolSize() { + return maxPoolSize; + } + + public void setMaxPoolSize(int maxPoolSize) { + this.maxPoolSize = maxPoolSize; + } + + public BlockingQueue getThreadPoolQueue() { + return threadPoolQueue; + } + + public void setThreadPoolQueue(BlockingQueue threadPoolQueue) { + this.threadPoolQueue = threadPoolQueue; + } + + public ThreadPoolExecutor getThreadPoolExecutor() { + return threadPoolExecutor; + } + + public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) { + this.threadPoolExecutor = threadPoolExecutor; + } + + protected RejectedExecutionHandler getRejectedExecutionHandler() { + return rejectedExecutionHandler; + } + + public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) { + this.rejectedExecutionHandler = rejectedExecutionHandler; + } + +} diff --git a/modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/JobExecutor.java b/modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/JobExecutor.java index 115861e8bc..0baa9c9bae 100644 --- a/modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/JobExecutor.java +++ b/modules/activiti-engine/src/main/java/org/activiti/engine/impl/jobexecutor/JobExecutor.java @@ -10,196 +10,131 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.activiti.engine.impl.jobexecutor; import java.util.List; import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.logging.Logger; -import org.activiti.engine.ActivitiException; +import org.activiti.engine.impl.cmd.AcquireJobsCmd; +import org.activiti.engine.impl.interceptor.Command; import org.activiti.engine.impl.interceptor.CommandExecutor; +import org.activiti.engine.runtime.Job; /** - * Manager class in charge of all background / asynchronous - * processing. - * You should generally only have one of these per Activiti - * instance in a JVM. In clustered situations, you can have - * multiple of these running against the same queue + - * pending job list. - * Uses a {@link ThreadPoolExecutor} internally. + *

Interface to the work management component of activiti.

+ * + *

This component is responsible for performing all background work + * ({@link Job Jobs}) scheduled by activiti.

+ * + *

You should generally only have one of these per Activiti instance (process + * engine) in a JVM. + * In clustered situations, you can have multiple of these running against the + * same queue + pending job list.

+ * + * @author Daniel Meyer */ -public class JobExecutor { +public abstract class JobExecutor { private static Logger log = Logger.getLogger(JobExecutor.class.getName()); + protected String name = "JobExecutor["+getClass().getName()+"]"; protected CommandExecutor commandExecutor; + protected Command acquireJobsCmd; + protected AcquireJobsRunnable acquireJobsRunnable; + protected boolean isAutoActivate = false; - + protected boolean isActive = false; + protected int maxJobsPerAcquisition = 3; protected int waitTimeInMillis = 5 * 1000; protected String lockOwner = UUID.randomUUID().toString(); protected int lockTimeInMillis = 5 * 60 * 1000; - protected int queueSize = 5; - protected int corePoolSize = 3; - private int maxPoolSize = 10; - - protected JobAcquisitionThread jobAcquisitionThread; - protected BlockingQueue threadPoolQueue; - protected ThreadPoolExecutor threadPoolExecutor; - protected boolean isActive = false; - - public synchronized void start() { - if(isActive) { - // Already started, nothing to do - log.info("Ignoring duplicate JobExecutor start invocation"); - return; - } else { - isActive = true; - - if (jobAcquisitionThread==null) { - jobAcquisitionThread = new JobAcquisitionThread(this); - } - if (threadPoolQueue==null) { - threadPoolQueue = new ArrayBlockingQueue(queueSize); - } - if (threadPoolExecutor==null) { - threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 0L, TimeUnit.MILLISECONDS, threadPoolQueue); - threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); - } - // Create our pending jobs fetcher - log.fine("JobExecutor is starting the JobAcquisitionThread"); - jobAcquisitionThread.start(); + public void start() { + if (isActive) { + return; } + log.info("Starting up the JobExecutor["+getClass().getName()+"]."); + ensureInitialization(); + startExecutingJobs(); + isActive = true; } - public void shutdown() { - if(!isActive) { - log.info("Ignoring request to shut down non-active JobExecutor"); + public synchronized void shutdown() { + if (!isActive) { return; } - - log.info("Shutting down the JobExecutor"); - - // Ask the thread pool to finish and exit - threadPoolExecutor.shutdown(); - - // Waits for 1 minute to finish all currently executing jobs - try { - threadPoolExecutor.awaitTermination(60L, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new ActivitiException("Timeout during shutdown of job executor. " + - "The current running jobs could not end withing 60 seconds after shutdown operation.", e); - } - - // Close the pending jobs task - jobAcquisitionThread.shutdown(); - + log.info("Shutting down the JobExecutor["+getClass().getName()+"]."); + acquireJobsRunnable.stop(); + stopExecutingJobs(); + ensureCleanup(); isActive = false; - - // Clear references - threadPoolExecutor = null; - jobAcquisitionThread = null; } + protected void ensureInitialization() { + acquireJobsCmd = new AcquireJobsCmd(this); + acquireJobsRunnable = new AcquireJobsRunnable(this); + } + + protected void ensureCleanup() { + acquireJobsCmd = null; + acquireJobsRunnable = null; + } - /** - * Used to hint that new work exists on the - * queue, and that the {@link JobAcquisitionThread} - * should probably re-check for jobs. - */ public void jobWasAdded() { - if ( isActive - && jobAcquisitionThread != null - && jobAcquisitionThread.isActive() - ) { - jobAcquisitionThread.jobWasAdded(); + if(isActive) { + acquireJobsRunnable.jobWasAdded(); } } - public void executeJobs(List jobIds) { - // TODO: RejectedExecutionException handling! - threadPoolExecutor.execute(new ExecuteJobsRunnable(commandExecutor, jobIds)); - } - - // getters and setters ////////////////////////////////////////////////////// + protected abstract void startExecutingJobs(); + protected abstract void stopExecutingJobs(); + protected abstract void executeJobs(List jobIds); + + // getters and setters ////////////////////////////////////////////////////// public CommandExecutor getCommandExecutor() { return commandExecutor; } - + public int getWaitTimeInMillis() { return waitTimeInMillis; } - + public void setWaitTimeInMillis(int waitTimeInMillis) { this.waitTimeInMillis = waitTimeInMillis; } - + public int getLockTimeInMillis() { return lockTimeInMillis; } - + public void setLockTimeInMillis(int lockTimeInMillis) { this.lockTimeInMillis = lockTimeInMillis; } - - public int getQueueSize() { - return queueSize; - } - - public void setQueueSize(int queueSize) { - this.queueSize = queueSize; - } - - public int getCorePoolSize() { - return corePoolSize; - } - - public void setCorePoolSize(int corePoolSize) { - this.corePoolSize = corePoolSize; - } - public int getMaxPoolSize() { - return maxPoolSize; + public String getLockOwner() { + return lockOwner; } - public void setMaxPoolSize(int maxPoolSize) { - this.maxPoolSize = maxPoolSize; - } - - public JobAcquisitionThread getJobAcquisitionThread() { - return jobAcquisitionThread; - } - - public void setJobAcquisitionThread(JobAcquisitionThread jobAcquisitionThread) { - this.jobAcquisitionThread = jobAcquisitionThread; - } - - public BlockingQueue getThreadPoolQueue() { - return threadPoolQueue; + public void setLockOwner(String lockOwner) { + this.lockOwner = lockOwner; } - public void setThreadPoolQueue(BlockingQueue threadPoolQueue) { - this.threadPoolQueue = threadPoolQueue; + public boolean isAutoActivate() { + return isAutoActivate; } - public ThreadPoolExecutor getThreadPoolExecutor() { - return threadPoolExecutor; - } - - public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) { - this.threadPoolExecutor = threadPoolExecutor; + public void setCommandExecutor(CommandExecutor commandExecutor) { + this.commandExecutor = commandExecutor; } - - public boolean isActive() { - return isActive; + + public void setAutoActivate(boolean isAutoActivate) { + this.isAutoActivate = isAutoActivate; } - + public int getMaxJobsPerAcquisition() { return maxJobsPerAcquisition; } @@ -207,24 +142,21 @@ public class JobExecutor { public void setMaxJobsPerAcquisition(int maxJobsPerAcquisition) { this.maxJobsPerAcquisition = maxJobsPerAcquisition; } - - public String getLockOwner() { - return lockOwner; - } - public void setLockOwner(String lockOwner) { - this.lockOwner = lockOwner; - } - - public boolean isAutoActivate() { - return isAutoActivate; + public String getName() { + return name; } - public void setCommandExecutor(CommandExecutor commandExecutor) { - this.commandExecutor = commandExecutor; + public Command getAcquireJobsCmd() { + return acquireJobsCmd; } - public void setAutoActivate(boolean isAutoActivate) { - this.isAutoActivate = isAutoActivate; + public void setAcquireJobsCmd(Command acquireJobsCmd) { + this.acquireJobsCmd = acquireJobsCmd; } + + public boolean isActive() { + return isActive; + } + } diff --git a/modules/activiti-engine/src/test/java/org/activiti/engine/test/bpmn/async/AsyncTaskTest.java b/modules/activiti-engine/src/test/java/org/activiti/engine/test/bpmn/async/AsyncTaskTest.java index cbc2bf105d..1a48407b6c 100644 --- a/modules/activiti-engine/src/test/java/org/activiti/engine/test/bpmn/async/AsyncTaskTest.java +++ b/modules/activiti-engine/src/test/java/org/activiti/engine/test/bpmn/async/AsyncTaskTest.java @@ -39,7 +39,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase { // the service was not invoked: assertFalse(INVOCATION); - waitForJobExecutorToProcessAllJobs(5000L, 25L); + waitForJobExecutorToProcessAllJobs(10000L, 25L); // the service was invoked assertTrue(INVOCATION); @@ -54,7 +54,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase { // the listener was not yet invoked: assertNull(runtimeService.getVariable(pid, "listener")); - waitForJobExecutorToProcessAllJobs(5000L, 25L); + waitForJobExecutorToProcessAllJobs(10000L, 25L); assertEquals(0, managementService.createJobQuery().count()); } @@ -69,7 +69,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase { // the service was not invoked: assertFalse(INVOCATION); - waitForJobExecutorToProcessAllJobs(5000L, 25L); + waitForJobExecutorToProcessAllJobs(10000L, 25L); // the service was invoked assertTrue(INVOCATION); @@ -87,7 +87,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase { // the service was not invoked: assertFalse(INVOCATION); - waitForJobExecutorToProcessAllJobs(5000L, 25L); + waitForJobExecutorToProcessAllJobs(10000L, 25L); // the service was invoked assertTrue(INVOCATION); @@ -107,7 +107,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase { fail("the job must be a message"); } - waitForJobExecutorToProcessAllJobs(5000L, 25L); + waitForJobExecutorToProcessAllJobs(10000L, 25L); // the service failed: the execution is still sitting in the service task: Execution execution = runtimeService.createExecutionQuery().singleResult(); @@ -130,7 +130,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase { assertEquals(2, managementService.createJobQuery().count()); // let 'max-retires' on the message be reached - waitForJobExecutorToProcessAllJobs(5000L, 25L); + waitForJobExecutorToProcessAllJobs(10000L, 25L); // the service failed: the execution is still sitting in the service task: Execution execution = runtimeService.createExecutionQuery().singleResult(); @@ -142,7 +142,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase { // now the timer triggers: ClockUtil.setCurrentTime(new Date(System.currentTimeMillis()+10000)); - waitForJobExecutorToProcessAllJobs(5000L, 25L); + waitForJobExecutorToProcessAllJobs(10000L, 25L); // and we are done: assertNull(runtimeService.createExecutionQuery().singleResult()); @@ -161,7 +161,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase { // the service was not invoked: assertFalse(INVOCATION); - waitForJobExecutorToProcessAllJobs(5000L, 25L); + waitForJobExecutorToProcessAllJobs(10000L, 25L); // the service was invoked assertTrue(INVOCATION); @@ -177,7 +177,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase { assertEquals(1, managementService.createJobQuery().count()); - waitForJobExecutorToProcessAllJobs(5000L, 25L); + waitForJobExecutorToProcessAllJobs(10000L, 25L); // both the timer and the message are cancelled assertEquals(0, managementService.createJobQuery().count()); @@ -191,7 +191,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase { // now there should be one job in the database: assertEquals(1, managementService.createJobQuery().count()); - waitForJobExecutorToProcessAllJobs(5000L, 25L); + waitForJobExecutorToProcessAllJobs(10000L, 25L); // the job is done assertEquals(0, managementService.createJobQuery().count()); @@ -207,7 +207,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase { String eid = runtimeService.createExecutionQuery().singleResult().getId(); assertNull(runtimeService.getVariable(eid, "invoked")); - waitForJobExecutorToProcessAllJobs(5000L, 25L); + waitForJobExecutorToProcessAllJobs(10000L, 25L); // and the job is done assertEquals(0, managementService.createJobQuery().count()); @@ -226,7 +226,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase { // now there should be one job in the database: assertEquals(1, managementService.createJobQuery().count()); - waitForJobExecutorToProcessAllJobs(5000L, 25L); + waitForJobExecutorToProcessAllJobs(10000L, 25L); assertEquals(0, managementService.createJobQuery().count()); @@ -243,7 +243,7 @@ public class AsyncTaskTest extends PluggableActivitiTestCase { // there is no usertask assertNull(taskService.createTaskQuery().singleResult()); - waitForJobExecutorToProcessAllJobs(5000L, 25L); + waitForJobExecutorToProcessAllJobs(10000L, 25L); // the listener was now invoked: assertNotNull(runtimeService.getVariable(pid, "listener")); -- GitLab