提交 bc848140 编写于 作者: J Joram Barrez

Merge branch 'activiti6' of https://github.com/Activiti/Activiti into activiti6

# By Tijs Rademakers
# Via Tijs Rademakers
* 'activiti6' of https://github.com/Activiti/Activiti:
  First stage of job executor backwards compatibilty
......@@ -14,23 +14,34 @@ package org.activiti.engine.compatibility;
import java.util.Map;
import org.activiti.engine.impl.persistence.entity.JobEntity;
import org.activiti.engine.impl.persistence.entity.SignalEventSubscriptionEntity;
import org.activiti.engine.impl.persistence.entity.TaskEntity;
import org.activiti.engine.impl.repository.DeploymentBuilderImpl;
import org.activiti.engine.repository.Deployment;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.runtime.Clock;
import org.activiti.engine.runtime.Job;
import org.activiti.engine.runtime.ProcessInstance;
/**
* @author Joram Barrez
* @author Tijs Rademakers
*/
public interface Activiti5CompatibilityHandler {
public static final String ACTIVITI_5_ENGINE_TAG = "activiti-5";
ProcessDefinition getProcessDefinition(String processDefinitionId);
ProcessDefinition getProcessDefinitionByKey(String processDefinitionKey);
Deployment deploy(DeploymentBuilderImpl deploymentBuilder);
ProcessInstance startProcessInstance(String processDefinitionKey, String processDefinitionId, Map<String, Object> variables, String businessKey, String tenantId, String processInstanceName);
void deleteProcessInstance(String processInstanceId, String deleteReason);
void completeTask(TaskEntity taskEntity, Map<String, Object> variables, boolean localScope);
void trigger(String executionId, Map<String, Object> processVariables);
......@@ -39,4 +50,9 @@ public interface Activiti5CompatibilityHandler {
void signalEventReceived(SignalEventSubscriptionEntity signalEventSubscriptionEntity, Object payload, boolean async);
void executeJob(Job job);
void executeJobWithLockAndRetry(JobEntity job);
Clock getClock();
}
......@@ -24,6 +24,7 @@ import org.activiti.engine.impl.interceptor.CommandContext;
import org.activiti.engine.impl.interceptor.CommandExecutor;
import org.activiti.engine.impl.jobexecutor.FailedJobCommandFactory;
import org.activiti.engine.impl.persistence.entity.JobEntity;
import org.activiti.engine.impl.util.Activiti5Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -44,6 +45,21 @@ public class ExecuteAsyncRunnable implements Runnable {
}
public void run() {
Boolean isActiviti5ProcessDefinition = commandExecutor.execute(new Command<Boolean>() {
@Override
public Boolean execute(CommandContext commandContext) {
boolean isActiviti5 = Activiti5Util.isActiviti5ProcessDefinitionId(commandContext, job.getProcessDefinitionId());
if (isActiviti5) {
commandContext.getProcessEngineConfiguration().getActiviti5CompatibilityHandler().executeJobWithLockAndRetry(job);
}
return isActiviti5;
}
});
if (isActiviti5ProcessDefinition) {
return;
}
try {
if (job.isExclusive()) {
......@@ -65,8 +81,6 @@ public class ExecuteAsyncRunnable implements Runnable {
}
try {
commandExecutor.execute(new ExecuteAsyncJobCmd(job));
......
......@@ -12,12 +12,17 @@
*/
package org.activiti.engine.impl.cmd;
import java.io.Serializable;
import org.activiti.engine.ActivitiIllegalArgumentException;
import org.activiti.engine.ActivitiObjectNotFoundException;
import org.activiti.engine.compatibility.Activiti5CompatibilityHandler;
import org.activiti.engine.delegate.event.impl.ActivitiEventBuilder;
import org.activiti.engine.impl.interceptor.Command;
import org.activiti.engine.impl.interceptor.CommandContext;
import java.io.Serializable;
import org.activiti.engine.impl.persistence.entity.ExecutionEntity;
import org.activiti.engine.impl.util.Activiti5Util;
import org.activiti.engine.runtime.ProcessInstance;
/**
* @author Joram Barrez
......@@ -48,7 +53,18 @@ public class DeleteProcessInstanceCmd implements Command<Void>, Serializable {
.dispatchEvent(ActivitiEventBuilder.createCancelledEvent(this.processInstanceId, this.processInstanceId, null, deleteReason));
}
commandContext.getExecutionEntityManager().deleteProcessInstanceExecutionEntity(processInstanceId, null, deleteReason, true);
ExecutionEntity processInstanceEntity = commandContext.getExecutionEntityManager().findExecutionById(processInstanceId);
if (processInstanceEntity == null) {
throw new ActivitiObjectNotFoundException("No process instance found for id '" + processInstanceId + "'", ProcessInstance.class);
}
if (Activiti5Util.isActiviti5ProcessDefinitionId(commandContext, processInstanceEntity.getProcessDefinitionId())) {
Activiti5CompatibilityHandler activiti5CompatibilityHandler = Activiti5Util.getActiviti5CompatibilityHandler(commandContext);
activiti5CompatibilityHandler.deleteProcessInstance(processInstanceId, deleteReason);
} else {
commandContext.getExecutionEntityManager().deleteProcessInstanceExecutionEntity(processInstanceEntity, null, deleteReason, false, true);
}
// TODO : remove following line of deleteProcessInstanceExecutionEntity is found to be doing the same as deleteProcessInstance
// commandContext.getExecutionEntityManager().deleteProcessInstance(processInstanceId, deleteReason);
......
......@@ -17,6 +17,7 @@ import java.io.Serializable;
import org.activiti.engine.ActivitiException;
import org.activiti.engine.ActivitiIllegalArgumentException;
import org.activiti.engine.JobNotFoundException;
import org.activiti.engine.compatibility.Activiti5CompatibilityHandler;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.delegate.event.impl.ActivitiEventBuilder;
import org.activiti.engine.impl.cfg.TransactionState;
......@@ -25,6 +26,7 @@ import org.activiti.engine.impl.interceptor.CommandContext;
import org.activiti.engine.impl.interceptor.CommandContextCloseListener;
import org.activiti.engine.impl.jobexecutor.FailedJobListener;
import org.activiti.engine.impl.persistence.entity.JobEntity;
import org.activiti.engine.impl.util.Activiti5Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -67,6 +69,12 @@ public class ExecuteJobsCmd implements Command<Object>, Serializable {
log.debug("Executing job {}", job.getId());
}
if (Activiti5Util.isActiviti5ProcessDefinitionId(commandContext, job.getProcessDefinitionId())) {
Activiti5CompatibilityHandler activiti5CompatibilityHandler = Activiti5Util.getActiviti5CompatibilityHandler(commandContext);
activiti5CompatibilityHandler.executeJob(job);
return null;
}
commandContext.addCloseListener(new ManualJobExecutionCommandContextCloseListener(job));
try {
......
......@@ -12,9 +12,9 @@
*/
package org.activiti.compatibility.test;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import org.activiti.engine.impl.persistence.entity.DeploymentEntity;
import org.activiti.engine.impl.persistence.entity.ProcessDefinitionEntity;
import org.activiti.engine.repository.DeploymentProperties;
import org.activiti.engine.repository.ProcessDefinition;
......
......@@ -16,28 +16,75 @@ package org.activiti.compatibility;
import java.util.HashMap;
import java.util.Map;
import org.activiti.compatibility.wrapper.Activiti5ClockWrapper;
import org.activiti.compatibility.wrapper.Activiti5DeploymentWrapper;
import org.activiti.compatibility.wrapper.Activiti5ProcessDefinitionWrapper;
import org.activiti.compatibility.wrapper.Activiti5ProcessInstanceWrapper;
import org.activiti.engine.compatibility.Activiti5CompatibilityHandler;
import org.activiti.engine.impl.context.Context;
import org.activiti.engine.impl.persistence.entity.DeploymentEntity;
import org.activiti.engine.impl.persistence.entity.JobEntity;
import org.activiti.engine.impl.persistence.entity.ResourceEntity;
import org.activiti.engine.impl.persistence.entity.SignalEventSubscriptionEntity;
import org.activiti.engine.impl.persistence.entity.TaskEntity;
import org.activiti.engine.impl.persistence.entity.TimerEntity;
import org.activiti.engine.impl.repository.DeploymentBuilderImpl;
import org.activiti.engine.repository.Deployment;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.runtime.Clock;
import org.activiti.engine.runtime.Job;
import org.activiti.engine.runtime.ProcessInstance;
import org.activiti5.engine.ProcessEngine;
import org.activiti5.engine.impl.asyncexecutor.AsyncJobUtil;
import org.activiti5.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.activiti5.engine.impl.interceptor.Command;
import org.activiti5.engine.impl.interceptor.CommandContext;
import org.activiti5.engine.impl.persistence.entity.ProcessDefinitionEntity;
import org.activiti5.engine.repository.DeploymentBuilder;
/**
* @author Joram Barrez
* @author Tijs Rademakers
*/
public class DefaultActiviti5CompatibilityHandler implements Activiti5CompatibilityHandler {
protected DefaultProcessEngineFactory processEngineFactory;
protected ProcessEngine processEngine;
public ProcessDefinition getProcessDefinition(final String processDefinitionId) {
final ProcessEngineConfigurationImpl processEngineConfig = (ProcessEngineConfigurationImpl) getProcessEngine().getProcessEngineConfiguration();
ProcessDefinitionEntity processDefinitionEntity = processEngineConfig.getCommandExecutor().execute(new Command<ProcessDefinitionEntity>() {
@Override
public ProcessDefinitionEntity execute(CommandContext commandContext) {
return processEngineConfig.getDeploymentManager().findDeployedProcessDefinitionById(processDefinitionId);
}
});
Activiti5ProcessDefinitionWrapper wrapper = null;
if (processDefinitionEntity != null) {
wrapper = new Activiti5ProcessDefinitionWrapper(processDefinitionEntity);
}
return wrapper;
}
public ProcessDefinition getProcessDefinitionByKey(final String processDefinitionKey) {
final ProcessEngineConfigurationImpl processEngineConfig = (ProcessEngineConfigurationImpl) getProcessEngine().getProcessEngineConfiguration();
ProcessDefinitionEntity processDefinitionEntity = processEngineConfig.getCommandExecutor().execute(new Command<ProcessDefinitionEntity>() {
@Override
public ProcessDefinitionEntity execute(CommandContext commandContext) {
return processEngineConfig.getDeploymentManager().findDeployedLatestProcessDefinitionByKey(processDefinitionKey);
}
});
Activiti5ProcessDefinitionWrapper wrapper = null;
if (processDefinitionEntity != null) {
wrapper = new Activiti5ProcessDefinitionWrapper(processDefinitionEntity);
}
return wrapper;
}
public Deployment deploy(DeploymentBuilderImpl activiti6DeploymentBuilder) {
DeploymentBuilder deploymentBuilder = getProcessEngine().getRepositoryService().createDeployment();
......@@ -89,9 +136,12 @@ public class DefaultActiviti5CompatibilityHandler implements Activiti5Compatibil
Map<String, Object> variables, String businessKey, String tenantId, String processInstanceName) {
org.activiti5.engine.runtime.ProcessInstance activiti5ProcessInstance
= getProcessEngine().getRuntimeService().startProcessInstanceByKey(processDefinitionKey);
= getProcessEngine().getRuntimeService().startProcessInstanceByKey(processDefinitionKey, businessKey, variables);
return new Activiti5ProcessInstanceWrapper(activiti5ProcessInstance);
}
public void deleteProcessInstance(String processInstanceId, String deleteReason) {
getProcessEngine().getRuntimeService().deleteProcessInstance(processInstanceId, deleteReason);
}
public void completeTask(TaskEntity taskEntity, Map<String, Object> variables, boolean localScope) {
......@@ -121,6 +171,29 @@ public class DefaultActiviti5CompatibilityHandler implements Activiti5Compatibil
activiti5SignalEvent.eventReceived(payload, async);
}
public void executeJob(Job job) {
final ProcessEngineConfigurationImpl processEngineConfig = (ProcessEngineConfigurationImpl) getProcessEngine().getProcessEngineConfiguration();
final org.activiti5.engine.impl.persistence.entity.JobEntity activity5Job = convertToActiviti5JobEntity((JobEntity) job);
processEngineConfig.getCommandExecutor().execute(new Command<Void>() {
@Override
public Void execute(CommandContext commandContext) {
activity5Job.execute(commandContext);
return null;
}
});
}
public void executeJobWithLockAndRetry(JobEntity job) {
final ProcessEngineConfigurationImpl processEngineConfig = (ProcessEngineConfigurationImpl) getProcessEngine().getProcessEngineConfiguration();
final org.activiti5.engine.impl.persistence.entity.JobEntity activity5Job = convertToActiviti5JobEntity((JobEntity) job);
AsyncJobUtil.executeJob(activity5Job, processEngineConfig.getCommandExecutor());
}
public Clock getClock() {
return new Activiti5ClockWrapper(getProcessEngine().getProcessEngineConfiguration().getClock());
}
protected ProcessEngine getProcessEngine() {
if (processEngine == null) {
synchronized (this) {
......@@ -143,4 +216,32 @@ public class DefaultActiviti5CompatibilityHandler implements Activiti5Compatibil
this.processEngineFactory = processEngineFactory;
}
protected org.activiti5.engine.impl.persistence.entity.JobEntity convertToActiviti5JobEntity(JobEntity job) {
org.activiti5.engine.impl.persistence.entity.JobEntity activity5Job = null;
if (job instanceof TimerEntity) {
TimerEntity timer = (TimerEntity) job;
org.activiti5.engine.impl.persistence.entity.TimerEntity tempTimer = new org.activiti5.engine.impl.persistence.entity.TimerEntity();
tempTimer.setEndDate(timer.getEndDate());
tempTimer.setRepeat(timer.getRepeat());
activity5Job = tempTimer;
}
activity5Job.setDuedate(job.getDuedate());
activity5Job.setExclusive(job.isExclusive());
activity5Job.setExecutionId(job.getExecutionId());
activity5Job.setId(job.getId());
activity5Job.setJobHandlerConfiguration(job.getJobHandlerConfiguration());
activity5Job.setJobHandlerType(job.getJobHandlerType());
activity5Job.setJobType(job.getJobType());
activity5Job.setLockExpirationTime(job.getLockExpirationTime());
activity5Job.setLockOwner(job.getLockOwner());
activity5Job.setProcessDefinitionId(job.getProcessDefinitionId());
activity5Job.setProcessInstanceId(job.getProcessInstanceId());
activity5Job.setRetries(job.getRetries());
activity5Job.setRevision(job.getRevision());
activity5Job.setTenantId(job.getTenantId());
return activity5Job;
}
}
......@@ -20,6 +20,8 @@ import org.activiti.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.activiti.engine.impl.cfg.StandaloneProcessEngineConfiguration;
import org.activiti5.engine.ActivitiException;
import org.activiti5.engine.ProcessEngine;
import org.activiti5.engine.impl.asyncexecutor.AsyncExecutor;
import org.activiti5.engine.impl.asyncexecutor.DefaultAsyncJobExecutor;
import org.activiti5.engine.parse.BpmnParseHandler;
......@@ -37,6 +39,15 @@ public class DefaultProcessEngineFactory {
activiti5Configuration = new org.activiti5.engine.impl.cfg.StandaloneProcessEngineConfiguration();
activiti5Configuration.setDataSource(activiti6Configuration.getDataSource());
if (activiti6Configuration.isAsyncExecutorEnabled() && activiti6Configuration.getAsyncExecutor() != null) {
AsyncExecutor activiti5AsyncExecutor = new DefaultAsyncJobExecutor();
activiti5AsyncExecutor.setAsyncJobLockTimeInMillis(activiti6Configuration.getAsyncExecutor().getAsyncJobLockTimeInMillis());
activiti5AsyncExecutor.setDefaultAsyncJobAcquireWaitTimeInMillis(activiti6Configuration.getAsyncExecutor().getDefaultAsyncJobAcquireWaitTimeInMillis());
activiti5AsyncExecutor.setDefaultTimerJobAcquireWaitTimeInMillis(activiti6Configuration.getAsyncExecutor().getDefaultTimerJobAcquireWaitTimeInMillis());
activiti5AsyncExecutor.setRetryWaitTimeInMillis(activiti6Configuration.getAsyncExecutor().getRetryWaitTimeInMillis());
activiti5AsyncExecutor.setTimerLockTimeInMillis(activiti6Configuration.getAsyncExecutor().getTimerLockTimeInMillis());
activiti5Configuration.setAsyncExecutor(activiti5AsyncExecutor);
}
} else {
throw new ActivitiException("Unsupported process engine configuration");
......
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activiti.compatibility.wrapper;
import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;
import org.activiti.engine.runtime.Clock;
/**
* @author Tijs Rademakers
*/
public class Activiti5ClockWrapper implements Clock {
protected org.activiti5.engine.runtime.Clock activiti5Clock;
public Activiti5ClockWrapper(org.activiti5.engine.runtime.Clock activiti5Clock) {
this.activiti5Clock = activiti5Clock;
}
@Override
public void setCurrentTime(Date currentTime) {
activiti5Clock.setCurrentTime(currentTime);
}
@Override
public void setCurrentCalendar(Calendar currentTime) {
activiti5Clock.setCurrentCalendar(currentTime);
}
@Override
public void reset() {
activiti5Clock.reset();
}
@Override
public Date getCurrentTime() {
return activiti5Clock.getCurrentTime();
}
@Override
public Calendar getCurrentCalendar() {
return activiti5Clock.getCurrentCalendar();
}
@Override
public Calendar getCurrentCalendar(TimeZone timeZone) {
return activiti5Clock.getCurrentCalendar(timeZone);
}
@Override
public TimeZone getCurrentTimeZone() {
return activiti5Clock.getCurrentTimeZone();
}
}
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activiti.compatibility.wrapper;
import org.activiti.engine.repository.ProcessDefinition;
/**
* Wraps an Activiti 5 process definition to an Activiti 6 {@link ProcessDefinition}.
*
* @author Tijs Rademakers
*/
public class Activiti5ProcessDefinitionWrapper implements ProcessDefinition {
private org.activiti5.engine.repository.ProcessDefinition activit5ProcessDefinition;
public Activiti5ProcessDefinitionWrapper(org.activiti5.engine.repository.ProcessDefinition activit5ProcessDefinition) {
this.activit5ProcessDefinition = activit5ProcessDefinition;
}
@Override
public String getId() {
return activit5ProcessDefinition.getId();
}
@Override
public String getCategory() {
return activit5ProcessDefinition.getCategory();
}
@Override
public String getName() {
return activit5ProcessDefinition.getName();
}
@Override
public String getKey() {
return activit5ProcessDefinition.getKey();
}
@Override
public String getDescription() {
return activit5ProcessDefinition.getDescription();
}
@Override
public int getVersion() {
return activit5ProcessDefinition.getVersion();
}
@Override
public String getResourceName() {
return activit5ProcessDefinition.getResourceName();
}
@Override
public String getDeploymentId() {
return activit5ProcessDefinition.getDeploymentId();
}
@Override
public String getDiagramResourceName() {
return activit5ProcessDefinition.getDiagramResourceName();
}
@Override
public boolean hasStartFormKey() {
return activit5ProcessDefinition.hasStartFormKey();
}
@Override
public boolean hasGraphicalNotation() {
return activit5ProcessDefinition.hasGraphicalNotation();
}
@Override
public boolean isSuspended() {
return activit5ProcessDefinition.isSuspended();
}
@Override
public String getTenantId() {
return activit5ProcessDefinition.getTenantId();
}
public Object getRawObject() {
return activit5ProcessDefinition;
}
}
package org.activiti5.engine.impl.asyncexecutor;
import org.activiti5.engine.ActivitiOptimisticLockingException;
import org.activiti5.engine.delegate.event.ActivitiEventType;
import org.activiti5.engine.delegate.event.impl.ActivitiEventBuilder;
import org.activiti5.engine.impl.cmd.ExecuteAsyncJobCmd;
import org.activiti5.engine.impl.cmd.LockExclusiveJobCmd;
import org.activiti5.engine.impl.cmd.UnlockExclusiveJobCmd;
import org.activiti5.engine.impl.interceptor.Command;
import org.activiti5.engine.impl.interceptor.CommandConfig;
import org.activiti5.engine.impl.interceptor.CommandContext;
import org.activiti5.engine.impl.interceptor.CommandExecutor;
import org.activiti5.engine.impl.jobexecutor.FailedJobCommandFactory;
import org.activiti5.engine.impl.persistence.entity.JobEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AsyncJobUtil {
private static Logger log = LoggerFactory.getLogger(AsyncJobUtil.class);
public static void executeJob(final JobEntity job, final CommandExecutor commandExecutor) {
try {
if (job.isExclusive()) {
commandExecutor.execute(new LockExclusiveJobCmd(job));
}
} catch (Throwable lockException) {
if (log.isDebugEnabled()) {
log.debug("Exception during exclusive job acquisition. Retrying job.", lockException.getMessage());
}
commandExecutor.execute(new Command<Void>() {
public Void execute(CommandContext commandContext) {
commandContext.getJobEntityManager().retryAsyncJob(job);
return null;
}
});
return;
}
try {
commandExecutor.execute(new ExecuteAsyncJobCmd(job));
} catch (final ActivitiOptimisticLockingException e) {
handleFailedJob(job, e, commandExecutor);
if (log.isDebugEnabled()) {
log.debug("Optimistic locking exception during job execution. If you have multiple async executors running against the same database, " +
"this exception means that this thread tried to acquire an exclusive job, which already was changed by another async executor thread." +
"This is expected behavior in a clustered environment. " +
"You can ignore this message if you indeed have multiple job executor threads running against the same database. " +
"Exception message: {}", e.getMessage());
}
} catch (Throwable exception) {
handleFailedJob(job, exception, commandExecutor);
// Finally, Throw the exception to indicate the ExecuteAsyncJobCmd failed
String message = "Job " + job.getId() + " failed";
log.error(message, exception);
}
try {
if (job.isExclusive()) {
commandExecutor.execute(new UnlockExclusiveJobCmd(job));
}
} catch (ActivitiOptimisticLockingException optimisticLockingException) {
if (log.isDebugEnabled()) {
log.debug("Optimistic locking exception while unlocking the job. If you have multiple async executors running against the same database, " +
"this exception means that this thread tried to acquire an exclusive job, which already was changed by another async executor thread." +
"This is expected behavior in a clustered environment. " +
"You can ignore this message if you indeed have multiple job executor acquisition threads running against the same database. " +
"Exception message: {}", optimisticLockingException.getMessage());
}
return;
} catch (Throwable t) {
log.error("Error while unlocking exclusive job " + job.getId(), t);
return;
}
}
protected static void handleFailedJob(final JobEntity job, final Throwable exception, final CommandExecutor commandExecutor) {
commandExecutor.execute(new Command<Void>() {
@Override
public Void execute(CommandContext commandContext) {
CommandConfig commandConfig = commandExecutor.getDefaultConfig().transactionRequiresNew();
FailedJobCommandFactory failedJobCommandFactory = commandContext.getFailedJobCommandFactory();
Command<Object> cmd = failedJobCommandFactory.getCommand(job.getId(), exception);
log.trace("Using FailedJobCommandFactory '" + failedJobCommandFactory.getClass() + "' and command of type '" + cmd.getClass() + "'");
commandExecutor.execute(commandConfig, cmd);
// Dispatch an event, indicating job execution failed in a try-catch block, to prevent the original
// exception to be swallowed
if (commandContext.getEventDispatcher().isEnabled()) {
try {
commandContext.getEventDispatcher().dispatchEvent(ActivitiEventBuilder.createEntityExceptionEvent(
ActivitiEventType.JOB_EXECUTION_FAILURE, job, exception));
} catch(Throwable ignore) {
log.warn("Exception occured while dispatching job failure event, ignoring.", ignore);
}
}
return null;
}
});
}
}
......@@ -12,20 +12,8 @@
*/
package org.activiti5.engine.impl.asyncexecutor;
import org.activiti5.engine.ActivitiOptimisticLockingException;
import org.activiti5.engine.delegate.event.ActivitiEventType;
import org.activiti5.engine.delegate.event.impl.ActivitiEventBuilder;
import org.activiti5.engine.impl.cmd.ExecuteAsyncJobCmd;
import org.activiti5.engine.impl.cmd.LockExclusiveJobCmd;
import org.activiti5.engine.impl.cmd.UnlockExclusiveJobCmd;
import org.activiti5.engine.impl.interceptor.Command;
import org.activiti5.engine.impl.interceptor.CommandConfig;
import org.activiti5.engine.impl.interceptor.CommandContext;
import org.activiti5.engine.impl.interceptor.CommandExecutor;
import org.activiti5.engine.impl.jobexecutor.FailedJobCommandFactory;
import org.activiti5.engine.impl.persistence.entity.JobEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Joram Barrez
......@@ -33,8 +21,6 @@ import org.slf4j.LoggerFactory;
*/
public class ExecuteAsyncRunnable implements Runnable {
private static Logger log = LoggerFactory.getLogger(ExecuteAsyncRunnable.class);
protected JobEntity job;
protected CommandExecutor commandExecutor;
......@@ -44,98 +30,6 @@ public class ExecuteAsyncRunnable implements Runnable {
}
public void run() {
try {
if (job.isExclusive()) {
commandExecutor.execute(new LockExclusiveJobCmd(job));
}
} catch (Throwable lockException) {
if (log.isDebugEnabled()) {
log.debug("Exception during exclusive job acquisition. Retrying job.", lockException.getMessage());
}
commandExecutor.execute(new Command<Void>() {
public Void execute(CommandContext commandContext) {
commandContext.getJobEntityManager().retryAsyncJob(job);
return null;
}
});
return;
}
try {
commandExecutor.execute(new ExecuteAsyncJobCmd(job));
} catch (final ActivitiOptimisticLockingException e) {
handleFailedJob(e);
if (log.isDebugEnabled()) {
log.debug("Optimistic locking exception during job execution. If you have multiple async executors running against the same database, " +
"this exception means that this thread tried to acquire an exclusive job, which already was changed by another async executor thread." +
"This is expected behavior in a clustered environment. " +
"You can ignore this message if you indeed have multiple job executor threads running against the same database. " +
"Exception message: {}", e.getMessage());
}
} catch (Throwable exception) {
handleFailedJob(exception);
// Finally, Throw the exception to indicate the ExecuteAsyncJobCmd failed
String message = "Job " + job.getId() + " failed";
log.error(message, exception);
}
try {
if (job.isExclusive()) {
commandExecutor.execute(new UnlockExclusiveJobCmd(job));
}
} catch (ActivitiOptimisticLockingException optimisticLockingException) {
if (log.isDebugEnabled()) {
log.debug("Optimistic locking exception while unlocking the job. If you have multiple async executors running against the same database, " +
"this exception means that this thread tried to acquire an exclusive job, which already was changed by another async executor thread." +
"This is expected behavior in a clustered environment. " +
"You can ignore this message if you indeed have multiple job executor acquisition threads running against the same database. " +
"Exception message: {}", optimisticLockingException.getMessage());
}
return;
} catch (Throwable t) {
log.error("Error while unlocking exclusive job " + job.getId(), t);
return;
}
}
protected void handleFailedJob(final Throwable exception) {
commandExecutor.execute(new Command<Void>() {
@Override
public Void execute(CommandContext commandContext) {
CommandConfig commandConfig = commandExecutor.getDefaultConfig().transactionRequiresNew();
FailedJobCommandFactory failedJobCommandFactory = commandContext.getFailedJobCommandFactory();
Command<Object> cmd = failedJobCommandFactory.getCommand(job.getId(), exception);
log.trace("Using FailedJobCommandFactory '" + failedJobCommandFactory.getClass() + "' and command of type '" + cmd.getClass() + "'");
commandExecutor.execute(commandConfig, cmd);
// Dispatch an event, indicating job execution failed in a try-catch block, to prevent the original
// exception to be swallowed
if (commandContext.getEventDispatcher().isEnabled()) {
try {
commandContext.getEventDispatcher().dispatchEvent(ActivitiEventBuilder.createEntityExceptionEvent(
ActivitiEventType.JOB_EXECUTION_FAILURE, job, exception));
} catch(Throwable ignore) {
log.warn("Exception occured while dispatching job failure event, ignoring.", ignore);
}
}
return null;
}
});
AsyncJobUtil.executeJob(job, commandExecutor);
}
}
......@@ -12,11 +12,11 @@
*/
package org.activiti.engine.test.bpmn.event.end;
import org.activiti5.engine.ActivitiOptimisticLockingException;
import org.activiti.engine.ActivitiOptimisticLockingException;
import org.activiti.engine.runtime.ProcessInstance;
import org.activiti.engine.task.Task;
import org.activiti.engine.test.Deployment;
import org.activiti5.engine.impl.test.PluggableActivitiTestCase;
import org.activiti5.engine.runtime.ProcessInstance;
import org.activiti5.engine.task.Task;
import org.activiti5.engine.test.Deployment;
/**
* @author Joram Barrez
......
......@@ -23,16 +23,17 @@ import java.util.Map;
import org.activiti.bpmn.model.ExtensionAttribute;
import org.activiti.bpmn.model.ExtensionElement;
import org.activiti.compatibility.wrapper.Activiti5ProcessDefinitionWrapper;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.runtime.ProcessInstance;
import org.activiti.engine.task.Task;
import org.activiti.engine.test.Deployment;
import org.activiti5.engine.delegate.DelegateExecution;
import org.activiti5.engine.delegate.JavaDelegate;
import org.activiti5.engine.impl.bpmn.behavior.TerminateEndEventActivityBehavior;
import org.activiti5.engine.impl.persistence.entity.ProcessDefinitionEntity;
import org.activiti5.engine.impl.pvm.process.ActivityImpl;
import org.activiti5.engine.impl.test.PluggableActivitiTestCase;
import org.activiti5.engine.repository.ProcessDefinition;
import org.activiti5.engine.runtime.ProcessInstance;
import org.activiti5.engine.task.Task;
import org.activiti5.engine.test.Deployment;
/**
* @author Nico Rehwaldt
......@@ -361,12 +362,13 @@ public class TerminateEndEventTest extends PluggableActivitiTestCase {
}
public void testParseTerminateEndEventDefinitionWithExtensions() {
org.activiti5.engine.repository.Deployment deployment = repositoryService.createDeployment().addClasspathResource("org/activiti/engine/test/bpmn/event/end/TerminateEndEventTest.parseExtensionElements.bpmn20.xml").deploy();
org.activiti.engine.repository.Deployment deployment = repositoryService.createDeployment().addClasspathResource("org/activiti/engine/test/bpmn/event/end/TerminateEndEventTest.parseExtensionElements.bpmn20.xml").deploy();
ProcessDefinition processDefinitionQuery = repositoryService.createProcessDefinitionQuery().deploymentId(deployment.getId()).singleResult();
ProcessDefinitionEntity processDefinition = this.processEngineConfiguration.getProcessDefinitionCache().get(processDefinitionQuery.getId());
assertThat(processDefinition.getActivities().size(), is(2));
ActivityImpl endEvent = processDefinition.getActivities().get(1);
ProcessDefinition processDefinition = processEngineConfiguration.getActiviti5CompatibilityHandler().getProcessDefinition(processDefinitionQuery.getId());
ProcessDefinitionEntity rawEntity = (ProcessDefinitionEntity) ((Activiti5ProcessDefinitionWrapper) processDefinition).getRawObject();
assertThat(rawEntity.getActivities().size(), is(2));
ActivityImpl endEvent = rawEntity.getActivities().get(1);
assertThat(endEvent.getId(), is("terminateEnd"));
assertThat(endEvent.getActivityBehavior(), instanceOf(TerminateEndEventActivityBehavior.class));
TerminateEndEventActivityBehavior terminateEndEventBehavior = (TerminateEndEventActivityBehavior) endEvent.getActivityBehavior();
......
......@@ -13,8 +13,8 @@
package org.activiti.engine.test.bpmn.parallel;
import org.activiti.engine.test.Deployment;
import org.activiti5.engine.impl.test.PluggableActivitiTestCase;
import org.activiti5.engine.test.Deployment;
/**
......
......@@ -18,18 +18,15 @@ import java.util.List;
import org.activiti.bpmn.exceptions.XMLException;
import org.activiti.bpmn.model.BpmnModel;
import org.activiti.bpmn.model.Process;
import org.activiti5.engine.impl.context.Context;
import org.activiti5.engine.impl.interceptor.Command;
import org.activiti5.engine.impl.interceptor.CommandContext;
import org.activiti5.engine.impl.interceptor.CommandExecutor;
import org.activiti.compatibility.wrapper.Activiti5ProcessDefinitionWrapper;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.test.Deployment;
import org.activiti5.engine.impl.persistence.entity.ProcessDefinitionEntity;
import org.activiti5.engine.impl.pvm.PvmTransition;
import org.activiti5.engine.impl.pvm.process.ActivityImpl;
import org.activiti5.engine.impl.pvm.process.TransitionImpl;
import org.activiti5.engine.impl.test.PluggableActivitiTestCase;
import org.activiti5.engine.impl.test.TestHelper;
import org.activiti5.engine.repository.ProcessDefinition;
import org.activiti5.engine.test.Deployment;
/**
......@@ -69,25 +66,17 @@ public class BpmnParseTest extends PluggableActivitiTestCase {
@Deployment
public void testParseDiagramInterchangeElements() {
// Graphical information is not yet exposed publicly, so we need to do some plumbing
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
ProcessDefinitionEntity processDefinitionEntity = commandExecutor.execute(new Command<ProcessDefinitionEntity>() {
public ProcessDefinitionEntity execute(CommandContext commandContext) {
return Context
.getProcessEngineConfiguration()
.getDeploymentManager()
.findDeployedLatestProcessDefinitionByKey("myProcess");
}
});
ProcessDefinition processDefinition = processEngineConfiguration.getActiviti5CompatibilityHandler().getProcessDefinitionByKey("myProcess");
ProcessDefinitionEntity rawEntity = (ProcessDefinitionEntity) ((Activiti5ProcessDefinitionWrapper) processDefinition).getRawObject();
assertNotNull(processDefinitionEntity);
assertEquals(7, processDefinitionEntity.getActivities().size());
assertNotNull(rawEntity);
assertEquals(7, rawEntity.getActivities().size());
// Check if diagram has been created based on Diagram Interchange when it's not a headless instance
List<String> resourceNames = repositoryService.getDeploymentResourceNames(processDefinitionEntity.getDeploymentId());
List<String> resourceNames = repositoryService.getDeploymentResourceNames(rawEntity.getDeploymentId());
assertEquals(2, resourceNames.size());
for (ActivityImpl activity : processDefinitionEntity.getActivities()) {
for (ActivityImpl activity : rawEntity.getActivities()) {
if (activity.getId().equals("theStart")) {
assertActivityBounds(activity, 70, 255, 30, 30);
......@@ -131,19 +120,12 @@ public class BpmnParseTest extends PluggableActivitiTestCase {
@Deployment
public void testParseNamespaceInConditionExpressionType() {
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
ProcessDefinitionEntity processDefinitionEntity = commandExecutor.execute(new Command<ProcessDefinitionEntity>() {
public ProcessDefinitionEntity execute(CommandContext commandContext) {
return Context
.getProcessEngineConfiguration()
.getDeploymentManager()
.findDeployedLatestProcessDefinitionByKey("resolvableNamespacesProcess");
}
});
ProcessDefinition processDefinition = processEngineConfiguration.getActiviti5CompatibilityHandler().getProcessDefinitionByKey("resolvableNamespacesProcess");
ProcessDefinitionEntity rawEntity = (ProcessDefinitionEntity) ((Activiti5ProcessDefinitionWrapper) processDefinition).getRawObject();
// Test that the process definition has been deployed
assertNotNull(processDefinitionEntity);
ActivityImpl activity = processDefinitionEntity.findActivity("ExclusiveGateway_1");
assertNotNull(rawEntity);
ActivityImpl activity = rawEntity.findActivity("ExclusiveGateway_1");
assertNotNull(activity);
// Test that the conditions has been resolved
......
......@@ -16,11 +16,11 @@ package org.activiti.engine.test.bpmn.sequenceflow;
import java.util.HashMap;
import java.util.Map;
import org.activiti.engine.runtime.ProcessInstance;
import org.activiti.engine.task.Task;
import org.activiti.engine.test.Deployment;
import org.activiti5.engine.impl.test.PluggableActivitiTestCase;
import org.activiti5.engine.impl.util.CollectionUtil;
import org.activiti5.engine.runtime.ProcessInstance;
import org.activiti5.engine.task.Task;
import org.activiti5.engine.test.Deployment;
/**
......
......@@ -13,10 +13,10 @@
package org.activiti.engine.test.bpmn.sequenceflow;
import org.activiti.engine.test.Deployment;
import org.activiti.engine.test.bpmn.gateway.ExclusiveGatewayTest;
import org.activiti5.engine.impl.test.PluggableActivitiTestCase;
import org.activiti5.engine.impl.util.CollectionUtil;
import org.activiti5.engine.test.Deployment;
/**
......
......@@ -14,12 +14,12 @@ package org.activiti.engine.test.bpmn.servicetask;
import java.util.List;
import org.activiti5.engine.identity.Group;
import org.activiti5.engine.identity.User;
import org.activiti.engine.identity.Group;
import org.activiti.engine.identity.User;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.runtime.ProcessInstance;
import org.activiti.engine.test.Deployment;
import org.activiti5.engine.impl.test.PluggableActivitiTestCase;
import org.activiti5.engine.repository.ProcessDefinition;
import org.activiti5.engine.runtime.ProcessInstance;
import org.activiti5.engine.test.Deployment;
/**
* @author Joram Barrez
......
......@@ -15,10 +15,10 @@ package org.activiti.engine.test.bpmn.servicetask;
import java.io.Serializable;
import org.activiti.engine.test.Deployment;
import org.activiti5.engine.delegate.DelegateExecution;
import org.activiti5.engine.delegate.JavaDelegate;
import org.activiti5.engine.impl.test.PluggableActivitiTestCase;
import org.activiti5.engine.test.Deployment;
/**
......
......@@ -13,18 +13,18 @@
package org.activiti.engine.test.bpmn.subprocess;
import java.io.InputStream;
import java.util.List;
import org.activiti.bpmn.converter.BpmnXMLConverter;
import org.activiti.bpmn.model.BpmnModel;
import org.activiti5.engine.ActivitiException;
import org.activiti.engine.ActivitiException;
import org.activiti.engine.repository.Deployment;
import org.activiti.engine.repository.ProcessDefinition;
import org.activiti.engine.runtime.ProcessInstance;
import org.activiti5.engine.impl.test.ResourceActivitiTestCase;
import org.activiti5.engine.impl.util.io.InputStreamSource;
import org.activiti5.engine.impl.util.io.StreamSource;
import org.activiti5.engine.repository.Deployment;
import org.activiti5.engine.repository.ProcessDefinition;
import org.activiti5.engine.runtime.ProcessInstance;
import java.io.InputStream;
import java.util.List;
public class CallActivityTest extends ResourceActivitiTestCase {
......@@ -39,7 +39,7 @@ public class CallActivityTest extends ResourceActivitiTestCase {
public void testInstantiateProcessByMessage() throws Exception {
BpmnModel messageTriggeredBpmnModel = loadBPMNModel(MESSAGE_TRIGGERED_PROCESS_RESOURCE);
Deployment messageTriggeredBpmnDeployment = processEngine.getRepositoryService()
processEngine.getRepositoryService()
.createDeployment()
.name("messageTriggeredProcessDeployment")
.addBpmnModel("messageTriggered.bpmn20.xml", messageTriggeredBpmnModel).deploy();
......@@ -59,7 +59,7 @@ public class CallActivityTest extends ResourceActivitiTestCase {
suspendProcessDefinitions(messageTriggeredBpmnDeployment);
try {
ProcessInstance childProcessInstance = runtimeService.startProcessInstanceByMessage("TRIGGER_PROCESS_MESSAGE");
runtimeService.startProcessInstanceByMessage("TRIGGER_PROCESS_MESSAGE");
fail("Exception expected");
} catch (ActivitiException ae) {
assertTextPresent("Cannot start process instance. Process definition Message Triggered Process", ae.getMessage());
......@@ -70,7 +70,7 @@ public class CallActivityTest extends ResourceActivitiTestCase {
public void testInstantiateChildProcess() throws Exception {
BpmnModel childBpmnModel = loadBPMNModel(CHILD_PROCESS_RESOURCE);
Deployment childDeployment = processEngine.getRepositoryService()
processEngine.getRepositoryService()
.createDeployment()
.name("childProcessDeployment")
.addBpmnModel("childProcess.bpmn20.xml", childBpmnModel).deploy();
......@@ -90,7 +90,7 @@ public class CallActivityTest extends ResourceActivitiTestCase {
suspendProcessDefinitions(childDeployment);
try {
ProcessInstance childProcessInstance = runtimeService.startProcessInstanceByKey("childProcess");
runtimeService.startProcessInstanceByKey("childProcess");
fail("Exception expected");
} catch (ActivitiException ae) {
assertTextPresent("Cannot start process instance. Process definition Child Process", ae.getMessage());
......@@ -108,7 +108,7 @@ public class CallActivityTest extends ResourceActivitiTestCase {
.name("childProcessDeployment")
.addBpmnModel("childProcess.bpmn20.xml", childBpmnModel).deploy();
Deployment masterDeployment = processEngine.getRepositoryService()
processEngine.getRepositoryService()
.createDeployment()
.name("masterProcessDeployment")
.addBpmnModel("masterProcess.bpmn20.xml", mainBpmnModel).deploy();
......@@ -116,7 +116,7 @@ public class CallActivityTest extends ResourceActivitiTestCase {
suspendProcessDefinitions(childDeployment);
try {
ProcessInstance masterProcessInstance = runtimeService.startProcessInstanceByKey("masterProcess");
runtimeService.startProcessInstanceByKey("masterProcess");
fail("Exception expected");
} catch (ActivitiException ae) {
assertTextPresent("Cannot start process instance. Process definition Child Process", ae.getMessage());
......
......@@ -18,13 +18,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.activiti.engine.runtime.Job;
import org.activiti.engine.runtime.ProcessInstance;
import org.activiti.engine.task.Task;
import org.activiti.engine.task.TaskQuery;
import org.activiti.engine.test.Deployment;
import org.activiti5.engine.impl.test.PluggableActivitiTestCase;
import org.activiti5.engine.impl.util.CollectionUtil;
import org.activiti5.engine.runtime.Job;
import org.activiti5.engine.runtime.ProcessInstance;
import org.activiti5.engine.task.Task;
import org.activiti5.engine.task.TaskQuery;
import org.activiti5.engine.test.Deployment;
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册