提交 83db9c8c 编写于 作者: J Joram Barrez

Bringing the hazelcast async executor up to speed with the latest changes in...

Bringing the hazelcast async executor up to speed with the latest changes in the DefaultAsyncExecutor (in fact, extending from it now)
上级 4083b37f
......@@ -23,6 +23,42 @@
<artifactId>hazelcast</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.30</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>tomcat-jdbc</artifactId>
<version>8.0.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.uuid</groupId>
<artifactId>java-uuid-generator</artifactId>
<version>3.1.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
......
package org.activiti.async.executor.hazelcast;
import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.activiti.engine.ProcessEngines;
import org.activiti.engine.impl.asyncexecutor.AsyncExecutor;
import org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor;
import org.activiti.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.activiti.engine.impl.cmd.ExecuteAsyncJobCmd;
import org.activiti.engine.impl.interceptor.CommandExecutor;
......@@ -21,6 +21,8 @@ import com.hazelcast.core.IExecutorService;
import com.hazelcast.monitor.LocalExecutorStats;
/**
* Note: very experimental and untested!
*
* Implementation of the Activiti {@link AsyncExecutor} using a distributed {@link ExecutorService}
* from {@link Hazelcast}, the {@link IExecutorService}.
*
......@@ -49,26 +51,13 @@ import com.hazelcast.monitor.LocalExecutorStats;
*
* @author Joram Barrez
*/
public class HazelCastDistributedAsyncExecutor implements AsyncExecutor {
public class HazelCastDistributedAsyncExecutor extends DefaultAsyncJobExecutor {
private static final Logger logger = LoggerFactory.getLogger(HazelCastDistributedAsyncExecutor.class);
private static final String EXECUTOR_NAME = "activiti";
// Injecteable
protected boolean isAutoActivate;
protected CommandExecutor commandExecutor;
protected int maxTimerJobsPerAcquisition = 1;
protected int maxAsyncJobsDuePerAcquisition = 1;
protected int defaultTimerJobAcquireWaitTimeInMillis = 10 * 1000;
protected int defaultAsyncJobAcquireWaitTimeInMillis = 10 * 1000;
protected String lockOwner = UUID.randomUUID().toString();
protected int timerLockTimeInMillis = 5 * 60 * 1000;
protected int asyncJobLockTimeInMillis = 5 * 60 * 1000;
// Runtime
protected boolean isActive;
private HazelcastInstance hazelcastInstance;
private IExecutorService executorService;
......@@ -83,12 +72,16 @@ public class HazelCastDistributedAsyncExecutor implements AsyncExecutor {
hazelcastInstance = Hazelcast.newHazelcastInstance();
executorService = hazelcastInstance.getExecutorService(EXECUTOR_NAME);
isActive = true;
// Starts up all acquire threads, etc
super.start();
}
@Override
public void shutdown() {
super.shutdown();
try {
executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
......@@ -101,13 +94,14 @@ public class HazelCastDistributedAsyncExecutor implements AsyncExecutor {
+ " jobs. Total execution time = " + localExecutorStats.getTotalExecutionLatency());
hazelcastInstance.shutdown();
isActive = false;
}
@Override
public void executeAsyncJob(JobEntity job) {
try {
executorService.submit(new DistributedExecuteJobRunnable(job));
System.out.println("BIEP");
} catch (RejectedExecutionException e) {
logger.info("Async job execution rejected. Executing job in calling thread.");
// Execute in calling thread so the job executor can be freed
......@@ -115,32 +109,6 @@ public class HazelCastDistributedAsyncExecutor implements AsyncExecutor {
}
}
@Override
public boolean isActive() {
return isActive;
}
@Override
public CommandExecutor getCommandExecutor() {
return commandExecutor;
}
@Override
public void setCommandExecutor(CommandExecutor commandExecutor) {
this.commandExecutor = commandExecutor;
}
@Override
public boolean isAutoActivate() {
return isAutoActivate;
}
@Override
public void setAutoActivate(boolean isAutoActivate) {
this.isAutoActivate = isAutoActivate;
}
public static class DistributedExecuteJobRunnable implements Runnable, Serializable {
private static final long serialVersionUID = -6294645802377574363L;
......@@ -160,62 +128,4 @@ public class HazelCastDistributedAsyncExecutor implements AsyncExecutor {
}
public String getLockOwner() {
return lockOwner;
}
public void setLockOwner(String lockOwner) {
this.lockOwner = lockOwner;
}
public int getTimerLockTimeInMillis() {
return timerLockTimeInMillis;
}
public void setTimerLockTimeInMillis(int timerLockTimeInMillis) {
this.timerLockTimeInMillis = timerLockTimeInMillis;
}
public int getAsyncJobLockTimeInMillis() {
return asyncJobLockTimeInMillis;
}
public void setAsyncJobLockTimeInMillis(int asyncJobLockTimeInMillis) {
this.asyncJobLockTimeInMillis = asyncJobLockTimeInMillis;
}
public int getMaxTimerJobsPerAcquisition() {
return maxTimerJobsPerAcquisition;
}
public void setMaxTimerJobsPerAcquisition(int maxTimerJobsPerAcquisition) {
this.maxTimerJobsPerAcquisition = maxTimerJobsPerAcquisition;
}
public int getMaxAsyncJobsDuePerAcquisition() {
return maxAsyncJobsDuePerAcquisition;
}
public void setMaxAsyncJobsDuePerAcquisition(int maxAsyncJobsDuePerAcquisition) {
this.maxAsyncJobsDuePerAcquisition = maxAsyncJobsDuePerAcquisition;
}
public int getDefaultTimerJobAcquireWaitTimeInMillis() {
return defaultTimerJobAcquireWaitTimeInMillis;
}
public void setDefaultTimerJobAcquireWaitTimeInMillis(
int defaultTimerJobAcquireWaitTimeInMillis) {
this.defaultTimerJobAcquireWaitTimeInMillis = defaultTimerJobAcquireWaitTimeInMillis;
}
public int getDefaultAsyncJobAcquireWaitTimeInMillis() {
return defaultAsyncJobAcquireWaitTimeInMillis;
}
public void setDefaultAsyncJobAcquireWaitTimeInMillis(
int defaultAsyncJobAcquireWaitTimeInMillis) {
this.defaultAsyncJobAcquireWaitTimeInMillis = defaultAsyncJobAcquireWaitTimeInMillis;
}
}
package org.activiti.async.executor.hazelcast;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
......@@ -8,8 +7,8 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.activiti.engine.impl.asyncexecutor.AsyncExecutor;
import org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor;
import org.activiti.engine.impl.asyncexecutor.ExecuteAsyncRunnable;
import org.activiti.engine.impl.interceptor.CommandExecutor;
import org.activiti.engine.impl.persistence.entity.JobEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -21,6 +20,8 @@ import com.hazelcast.core.IQueue;
import com.hazelcast.monitor.LocalQueueStats;
/**
* Note: very experimental and untested!
*
* Implementation of the Activiti {@link AsyncExecutor} using a distributed queue where the jobs
* to be executed are put on. One of the distributed nodes will take the job off the queue,
* and hand it off the local thread pool.
......@@ -70,45 +71,14 @@ import com.hazelcast.monitor.LocalQueueStats;
*
* @author Joram Barrez
*/
public class HazelCastDistributedQueueBasedAsyncExecutor implements AsyncExecutor {
public class HazelCastDistributedQueueBasedAsyncExecutor extends DefaultAsyncJobExecutor {
private static final Logger logger = LoggerFactory.getLogger(HazelCastDistributedQueueBasedAsyncExecutor.class);
private static final String QUEUE_NAME = "activiti";
// Injecteable
protected boolean isAutoActivate;
protected CommandExecutor commandExecutor;
protected int maxTimerJobsPerAcquisition = 1;
protected int maxAsyncJobsDuePerAcquisition = 1;
protected int defaultTimerJobAcquireWaitTimeInMillis = 10 * 1000;
protected int defaultAsyncJobAcquireWaitTimeInMillis = 10 * 1000;
protected String lockOwner = UUID.randomUUID().toString();
protected int timerLockTimeInMillis = 5 * 60 * 1000;
protected int asyncJobLockTimeInMillis = 5 * 60 * 1000;
/** The minimal number of threads that are kept alive in the threadpool for job execution */
protected int corePoolSize = 2;
/** The maximum number of threads that are kept alive in the threadpool for job execution */
protected int maxPoolSize = 10;
/**
* The time (in milliseconds) a thread used for job execution must be kept alive before it is
* destroyed. Default setting is 0. Having a non-default setting of 0 takes resources,
* but in the case of many job executions it avoids creating new threads all the time.
*/
protected long keepAliveTime = 60000L;
/** The size of the queue on which jobs to be executed are placed */
protected int queueSize = 100;
/** The time (in seconds) that is waited to gracefully shut down the threadpool used for job execution */
protected long secondsToWaitOnShutdown = 60L;
// Runtime
protected boolean isActive;
protected HazelcastInstance hazelcastInstance;
protected IQueue<JobEntity> jobQueue;
......@@ -132,7 +102,9 @@ public class HazelCastDistributedQueueBasedAsyncExecutor implements AsyncExecuto
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executorService = threadPoolExecutor;
isActive = true;
super.start();
// Needs to be done afterwards, since it depends on isActive
initJobQueueListener();
}
......@@ -145,7 +117,10 @@ public class HazelCastDistributedQueueBasedAsyncExecutor implements AsyncExecuto
try {
job = jobQueue.take(); // Blocking
} catch (InterruptedException e1) {
logger.info("jobQueueListenerThread interrupted. This is fine if the job executor is shutting down");
// Do nothing, this can happen when shutting down
} catch (HazelcastInstanceNotActiveException notActiveException) {
logger.info("Hazel cast not active exception caught. This is fine if the job executor is shutting down");
}
if (job != null) {
......@@ -161,26 +136,36 @@ public class HazelCastDistributedQueueBasedAsyncExecutor implements AsyncExecuto
@Override
public void shutdown() {
super.shutdown();
// Shut down local execution service
try {
logger.info("Shutting down local executor service");
executorService.shutdown();
executorService.awaitTermination(secondsToWaitOnShutdown, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.warn("Exception while waiting for executor service shutdown", e);
}
// Shut down listener thread
isActive = false;
LocalQueueStats localQueueStats = jobQueue.getLocalQueueStats();
logger.info("This async job executor has processed " + localQueueStats.getPollOperationCount());
// Shut down hazelcast
try {
LocalQueueStats localQueueStats = jobQueue.getLocalQueueStats();
logger.info("This async job executor has processed " + localQueueStats.getPollOperationCount());
hazelcastInstance.shutdown();
} catch (HazelcastInstanceNotActiveException e) {
// Nothing to do
}
// Shut down listener thread
isActive = false;
try {
logger.info("Shutting down jobQueueListenerThread");
jobQueueListenerThread.interrupt();
jobQueueListenerThread.join();
} catch (InterruptedException e) {
logger.warn("jobQueueListenerThread join was interrupted", e);
}
}
@Override
......@@ -192,133 +177,4 @@ public class HazelCastDistributedQueueBasedAsyncExecutor implements AsyncExecuto
}
}
@Override
public boolean isActive() {
return isActive;
}
@Override
public CommandExecutor getCommandExecutor() {
return commandExecutor;
}
@Override
public void setCommandExecutor(CommandExecutor commandExecutor) {
this.commandExecutor = commandExecutor;
}
@Override
public boolean isAutoActivate() {
return isAutoActivate;
}
@Override
public void setAutoActivate(boolean isAutoActivate) {
this.isAutoActivate = isAutoActivate;
}
public int getCorePoolSize() {
return corePoolSize;
}
public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}
public int getMaxPoolSize() {
return maxPoolSize;
}
public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
public long getKeepAliveTime() {
return keepAliveTime;
}
public void setKeepAliveTime(long keepAliveTime) {
this.keepAliveTime = keepAliveTime;
}
public int getQueueSize() {
return queueSize;
}
public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
}
public long getSecondsToWaitOnShutdown() {
return secondsToWaitOnShutdown;
}
public void setSecondsToWaitOnShutdown(long secondsToWaitOnShutdown) {
this.secondsToWaitOnShutdown = secondsToWaitOnShutdown;
}
public void setActive(boolean isActive) {
this.isActive = isActive;
}
public String getLockOwner() {
return lockOwner;
}
public void setLockOwner(String lockOwner) {
this.lockOwner = lockOwner;
}
public int getTimerLockTimeInMillis() {
return timerLockTimeInMillis;
}
public void setTimerLockTimeInMillis(int timerLockTimeInMillis) {
this.timerLockTimeInMillis = timerLockTimeInMillis;
}
public int getAsyncJobLockTimeInMillis() {
return asyncJobLockTimeInMillis;
}
public void setAsyncJobLockTimeInMillis(int asyncJobLockTimeInMillis) {
this.asyncJobLockTimeInMillis = asyncJobLockTimeInMillis;
}
public int getMaxTimerJobsPerAcquisition() {
return maxTimerJobsPerAcquisition;
}
public void setMaxTimerJobsPerAcquisition(int maxTimerJobsPerAcquisition) {
this.maxTimerJobsPerAcquisition = maxTimerJobsPerAcquisition;
}
public int getMaxAsyncJobsDuePerAcquisition() {
return maxAsyncJobsDuePerAcquisition;
}
public void setMaxAsyncJobsDuePerAcquisition(int maxAsyncJobsDuePerAcquisition) {
this.maxAsyncJobsDuePerAcquisition = maxAsyncJobsDuePerAcquisition;
}
public int getDefaultTimerJobAcquireWaitTimeInMillis() {
return defaultTimerJobAcquireWaitTimeInMillis;
}
public void setDefaultTimerJobAcquireWaitTimeInMillis(
int defaultTimerJobAcquireWaitTimeInMillis) {
this.defaultTimerJobAcquireWaitTimeInMillis = defaultTimerJobAcquireWaitTimeInMillis;
}
public int getDefaultAsyncJobAcquireWaitTimeInMillis() {
return defaultAsyncJobAcquireWaitTimeInMillis;
}
public void setDefaultAsyncJobAcquireWaitTimeInMillis(
int defaultAsyncJobAcquireWaitTimeInMillis) {
this.defaultAsyncJobAcquireWaitTimeInMillis = defaultAsyncJobAcquireWaitTimeInMillis;
}
}
package org.activiti.test.async.executor.hazelcast;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.ProcessEngineConfiguration;
/**
* @author Joram Barrez
*/
public class Main {
private static ProcessEngine processEngine;
public static void main(String[] args) throws Exception {
if (args.length == 0) {
System.err.println("Arguments needed");
System.err.println("First argument: { job-creator | job-executor | mixed}, to determine what this Main program will be doing: creating jobs, executing them or both");
System.err.println("Arguments in case of job-creator: { nrOfJobs nrOfJobCreationThreads}");
System.err.println("Arguments in case of job-executor: { nrOfJobs }");
System.err.println("In case of mixed: { nrOfJobs nrOfJobCreationThreads } ");
System.err.println();
System.err.println("Example usage : java -XX:MaxPermSize=512m -Xmx2G -Xms512m -jar theJar.jar job-creator 1000 4");
System.err.println("Example usage : java -XX:MaxPermSize=512m -Xmx2G -Xms512m -jar theJar.jar job-executor 1000");
System.err.println("Example usage : java -XX:MaxPermSize=512m -Xmx2G -Xms512m -jar theJar.jar mixed 1000 4");
System.err.println();
System.err.println("The last argument can always be 'keepDb'. In that case, no drop of the database schema will be done, which is the default for job creation or mixed.");
System.exit(-1);
}
int nrOfJobs = Integer.valueOf(args[1]);
int nrOfThreads = args[2] != null ? Integer.valueOf(args[2]) : -1;
boolean keepDb = "keepDb".equals(args[args.length - 1]);
boolean dropDb = !keepDb;
if ("job-creator".equals(args[0])) {
startCreateJobs(nrOfJobs, nrOfThreads, dropDb);
} else if ("job-executor".equals(args[0])) {
startExecuteJobs(nrOfJobs, dropDb);
} else if ("mixed".equals(args[0])) {
startMixed(nrOfJobs, nrOfThreads, dropDb);
} else {
System.err.println("Unknown argument '" + args[0] + "'");
System.exit(-1);
}
}
private static void startCreateJobs(int nrOfJobs, int nrOfThreads, boolean dropDb) throws Exception {
System.out.println(new Date() + " [Job Creator] Starting. Need to create " + nrOfJobs + " job with " + nrOfThreads + " threads");
// Job creator needs to be started first, as it will clean the database!
createJobCreatorProcessEngine(false, dropDb);
processEngine.getRepositoryService().createDeployment().name("job")
.addClasspathResource("job.bpmn20.xml").deploy();
ExecutorService executor = Executors.newFixedThreadPool(nrOfThreads);
for (int i = 0; i < nrOfJobs; i++) {
Runnable worker = new StartTestProcessInstanceThread();
executor.execute(worker);
}
executor.shutdown();
System.out.println(new Date() + " [Job Creator] All StartProcessInstanceThreads handed off to executor service");
boolean finishedAllInstances = false;
long unfinishedCount = 0;
long finishedCount = 0;
while (finishedAllInstances == false) {
unfinishedCount = processEngine.getHistoryService()
.createHistoricProcessInstanceQuery().unfinished().count();
finishedCount = processEngine.getHistoryService()
.createHistoricProcessInstanceQuery().finished().count();
finishedAllInstances = finishedCount >= nrOfJobs;
if (!finishedAllInstances) {
System.out.println(new Date() + " [Job Creator] " + finishedCount + " finished process instances in db, " + unfinishedCount + " unfinished process instances in db.");
Thread.sleep(10000L);
}
}
}
private static void startExecuteJobs(int nrOfJobs, boolean dropDb) throws Exception {
System.out.println(new Date() + " [Job Executor] Starting. Need to execute " + nrOfJobs + " jobs in total.");
createJobExecutorProcessEngine(false, dropDb);
boolean finishedAllInstances = false;
long lastPrintTime = 0;
long finishedCount = 0;
long start = System.currentTimeMillis();
while (finishedAllInstances == false) {
finishedCount = processEngine.getHistoryService()
.createHistoricProcessInstanceQuery().finished().count();
if (finishedCount >= nrOfJobs) {
finishedAllInstances = true;
} else {
if (System.currentTimeMillis() - lastPrintTime > 5000L) {
lastPrintTime = System.currentTimeMillis();
int percentage = (int) (((double) finishedCount / (double) nrOfJobs) * 100.0);
System.out.println(new Date() + " [Job Executor] Executed " + finishedCount + "/" + nrOfJobs + " jobs (" + percentage + "%)");
}
Thread.sleep(1000L);
}
}
long end = System.currentTimeMillis();
System.out.println();
System.out.println();
System.out.println("Jobs executed by this node: " + SleepDelegate.nrOfExecutions.get());
long time = end - start;
System.out.println("Took " + time + " ms");
double perSecond = ((double) SleepDelegate.nrOfExecutions.get() / (double) time) * 1000;
System.out.println("Which is " + perSecond + " jobs/second");
System.out.println();
System.out.println();
}
private static void startMixed(final int nrOfJobs, final int nrOfThreads, boolean dropDb) throws Exception {
createJobExecutorProcessEngine(true, dropDb);
// Create Jobs
Thread createJobsThread = new Thread(new Runnable() {
public void run() {
try {
startCreateJobs(nrOfJobs, nrOfThreads, false);
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
}
});
createJobsThread.start();
// Execute jobs
Thread executeJobsThread = new Thread(new Runnable() {
public void run() {
try {
startExecuteJobs(nrOfJobs, false);
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
}
});
executeJobsThread.start();
createJobsThread.join();
executeJobsThread.join();
processEngine.close();
System.out.println("Done.");
System.exit(0);
}
private static void createJobCreatorProcessEngine(boolean replaceExisting, boolean isDropDatabaseSchema) {
if (processEngine == null || replaceExisting) {
System.out.println("Creating process engine with config activiti_job_creator.cfg.xml. Dropping db first = " + isDropDatabaseSchema);
ProcessEngineConfiguration processEngineConfiguration = ProcessEngineConfiguration
.createProcessEngineConfigurationFromResource("activiti_job_creator.cfg.xml");
if (isDropDatabaseSchema) {
processEngineConfiguration.setDatabaseSchemaUpdate("drop-create");
}
processEngine = processEngineConfiguration.buildProcessEngine();
}
}
private static void createJobExecutorProcessEngine(boolean replaceExisting, boolean isDropDatabaseSchema) {
if (processEngine == null || replaceExisting) {
System.out.println("Creating process engine with config activiti_with_jobexecutor.cfg.xml. Dropping db first = " + isDropDatabaseSchema);
ProcessEngineConfiguration processEngineConfiguration = ProcessEngineConfiguration
.createProcessEngineConfigurationFromResource("activiti_with_jobexecutor.cfg.xml");
if (isDropDatabaseSchema) {
processEngineConfiguration.setDatabaseSchemaUpdate("drop-create");
}
processEngine = processEngineConfiguration.buildProcessEngine();
}
}
public static class StartTestProcessInstanceThread implements Runnable {
public void run() {
processEngine.getRuntimeService().startProcessInstanceByKey("job");
}
}
}
package org.activiti.test.async.executor.hazelcast;
import java.util.Date;
import java.util.concurrent.atomic.AtomicInteger;
import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.delegate.Expression;
import org.activiti.engine.delegate.JavaDelegate;
public class SleepDelegate implements JavaDelegate {
public static AtomicInteger nrOfExecutions = new AtomicInteger(0);
protected Expression sleepTime;
public void execute(DelegateExecution execution) throws Exception {
long startTime = System.currentTimeMillis();
long var = 0;
while (System.currentTimeMillis() - startTime < 200) {
var += startTime; // Doing something to keep the JVM busy
Thread.sleep(50L); // Doing some sleeps to mimic I/O
}
nrOfExecutions.incrementAndGet();
// System.out.println(new Date() + "Job Done!");
}
public void setSleepTime(Expression sleepTime) {
this.sleepTime = sleepTime;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url"value="jdbc:mysql://localhost:3306/activiti_job?characterEncoding=UTF-8" />
<property name="username" value="activiti" />
<property name="password" value="activiti" />
<property name="initialSize" value="1" />
<property name="maxActive" value="10" />
<property name="maxIdle" value="2" />
<property name="minIdle" value="1" />
<property name="timeBetweenEvictionRunsMillis" value="34000" />
<property name="minEvictableIdleTimeMillis" value="55000" />
<property name="validationQuery" value="SELECT 1" />
<property name="validationInterval" value="34000" />
<property name="testOnBorrow" value="true" />
<property name="removeAbandoned" value="true" />
<property name="removeAbandonedTimeout" value="55" />
</bean>
<bean id="processEngineConfiguration"
class="org.activiti.engine.impl.cfg.StandaloneProcessEngineConfiguration">
<property name="dataSource" ref="dataSource" />
<property name="jobExecutorActivate" value="false" />
<property name="idGenerator">
<bean class="org.activiti.engine.impl.persistence.StrongUuidGenerator" />
</property>
</bean>
</beans>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- Tomcat connection pooling -->
<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/activiti_job?characterEncoding=UTF-8" />
<property name="username" value="activiti" />
<property name="password" value="activiti" />
<property name="initialSize" value="25" />
<property name="maxActive" value="500" />
<property name="maxIdle" value="50" />
<property name="minIdle" value="25" />
<property name="timeBetweenEvictionRunsMillis" value="34000" />
<property name="minEvictableIdleTimeMillis" value="55000" />
<property name="validationQuery" value="SELECT 1" />
<property name="validationInterval" value="34000" />
<property name="testOnBorrow" value="true" />
<property name="removeAbandoned" value="true" />
<property name="removeAbandonedTimeout" value="55" />
</bean>
<bean id="processEngineConfiguration" class="org.activiti.engine.impl.cfg.StandaloneProcessEngineConfiguration">
<property name="dataSource" ref="dataSource" />
<property name="idGenerator">
<bean class="org.activiti.engine.impl.persistence.StrongUuidGenerator" />
</property>
<!-- job executor configurations -->
<property name="jobExecutorActivate" value="false" />
<!-- Comment out following line to run default async executor -->
<property name="asyncExecutor" ref="hazelCastAsyncExecutor" />
<property name="asyncExecutorEnabled" value="true" />
<property name="asyncExecutorActivate" value="true" />
</bean>
<!-- <bean id="hazelCastAsyncExecutor" class="org.activiti.async.executor.hazelcast.HazelCastDistributedAsyncExecutor"></bean> -->
<bean id="hazelCastAsyncExecutor" class="org.activiti.async.executor.hazelcast.HazelCastDistributedQueueBasedAsyncExecutor"></bean>
</beans>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.3.xsd"
xmlns="http://www.hazelcast.com/schema/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<group>
<name>dev</name>
<password>dev-pass</password>
</group>
<network>
<port auto-increment="false" port-count="100">57001</port>
<outbound-ports>
<!--
Allowed port range when connecting to other nodes.
0 or * means use system provided port.
-->
<ports>0</ports>
</outbound-ports>
<join>
<multicast enabled="true">
<multicast-group>224.2.2.3</multicast-group>
<multicast-port>54327</multicast-port>
</multicast>
<tcp-ip enabled="false">
<interface>192.168.0.184</interface>
<interface>192.168.0.198</interface>
<interface>192.168.0.171</interface>
</tcp-ip>
</join>
</network>
<partition-group enabled="false"/>
<executor-service name="activiti">
<pool-size>50</pool-size>
Queue capacity. 0 means Integer.MAX_VALUE.
<queue-capacity>1024</queue-capacity>
</executor-service>
<queue name="activiti">
<!--
Maximum size of the queue. When a JVM's local queue size reaches the maximum,
all put/offer operations will get blocked until the queue size
of the JVM goes down below the maximum.
Any integer between 0 and Integer.MAX_VALUE. 0 means
Integer.MAX_VALUE. Default is 0.
-->
<max-size>1024</max-size>
<!--
Number of backups. If 1 is set as the backup-count for example,
then all entries of the map will be copied to another JVM for
fail-safety. 0 means no backup.
-->
<backup-count>0</backup-count>
<!--
Number of async backups. 0 means no backup.
-->
<async-backup-count>0</async-backup-count>
<empty-queue-ttl>-1</empty-queue-ttl>
</queue>
</hazelcast>
<?xml version="1.0" encoding="UTF-8"?>
<definitions id="definitions"
xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:activiti="http://activiti.org/bpmn"
targetNamespace="Examples">
<process id="job" >
<startEvent id="theStart" />
<sequenceFlow id="flow1" sourceRef="theStart" targetRef="javaService" />
<serviceTask id="javaService"
name="Java service invocation"
activiti:async="true" activiti:exclusive="false"
activiti:class="org.activiti.test.async.executor.hazelcast.SleepDelegate">
<extensionElements>
<activiti:field name="sleepTime" stringValue="200" />
</extensionElements>
</serviceTask>
<sequenceFlow id="flow2" sourceRef="javaService" targetRef="theEnd" />
<endEvent id="theEnd" />
</process>
</definitions>
\ No newline at end of file
log4j.rootLogger=INFO, CA
# ConsoleAppender
log4j.appender.CA=org.apache.log4j.ConsoleAppender
log4j.appender.CA.layout=org.apache.log4j.PatternLayout
log4j.appender.CA.layout.ConversionPattern= %d{hh:mm:ss,SSS} [%t] %-5p %c %x - %m%n
log4j.logger.org.apache.ibatis.level=INFO
log4j.logger.javax.activation.level=INFO
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册