提交 a0965733 编写于 作者: J Joram Barrez

Fixes around job executor and exception storage

上级 0aef5c1c
......@@ -31,11 +31,14 @@ import org.activiti.engine.impl.cmd.GetTableCountCmd;
import org.activiti.engine.impl.cmd.GetTableMetaDataCmd;
import org.activiti.engine.impl.cmd.GetTableNameCmd;
import org.activiti.engine.impl.cmd.SetJobRetriesCmd;
import org.activiti.engine.impl.context.Context;
import org.activiti.engine.impl.db.DbSqlSession;
import org.activiti.engine.impl.db.DbSqlSessionFactory;
import org.activiti.engine.impl.interceptor.Command;
import org.activiti.engine.impl.interceptor.CommandConfig;
import org.activiti.engine.impl.interceptor.CommandContext;
import org.activiti.engine.impl.jobexecutor.JobExecutorContext;
import org.activiti.engine.impl.jobexecutor.SingleJobExecutorContext;
import org.activiti.engine.management.TableMetaData;
import org.activiti.engine.management.TablePageQuery;
import org.activiti.engine.runtime.JobQuery;
......@@ -61,7 +64,13 @@ public class ManagementServiceImpl extends ServiceImpl implements ManagementServ
}
public void executeJob(String jobId) {
commandExecutor.execute(new ExecuteJobsCmd(jobId));
JobExecutorContext jobExecutorContext = new SingleJobExecutorContext();
Context.setJobExecutorContext(jobExecutorContext);
try {
commandExecutor.execute(new ExecuteJobsCmd(jobId));
} finally {
Context.removeJobExecutorContext();
}
}
public void deleteJob(String jobId) {
......
......@@ -127,8 +127,7 @@ public class ExecuteAsyncRunnable implements Runnable {
commandExecutor.execute(commandConfig, cmd);
// Dispatch an event, indicating job execution failed in a
// try-catch block, to prevent the original
// exception to be swallowed
// try-catch block, to prevent the original exception to be swallowed
if (commandContext.getEventDispatcher().isEnabled()) {
try {
commandContext.getEventDispatcher().dispatchEvent(ActivitiEventBuilder.createEntityExceptionEvent(ActivitiEventType.JOB_EXECUTION_FAILURE, job, exception));
......
......@@ -15,6 +15,7 @@ package org.activiti.engine.impl.calendar;
import java.util.Date;
import org.activiti.engine.ActivitiException;
import org.activiti.engine.impl.context.Context;
import org.activiti.engine.runtime.ClockReader;
import org.joda.time.DateTime;
import org.joda.time.Period;
......@@ -32,7 +33,8 @@ public class DueDateBusinessCalendar extends BusinessCalendarImpl {
try {
// check if due period was specified
if (duedate.startsWith("P")) {
return DateTime.now().plus(Period.parse(duedate)).toDate();
DateTime dateTime = new DateTime(Context.getProcessEngineConfiguration().getClock().getCurrentTime().getTime());
return dateTime.plus(Period.parse(duedate)).toDate();
}
return DateTime.parse(duedate).toDate();
......
......@@ -17,13 +17,9 @@ import java.io.Serializable;
import org.activiti.engine.ActivitiException;
import org.activiti.engine.ActivitiIllegalArgumentException;
import org.activiti.engine.JobNotFoundException;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.delegate.event.impl.ActivitiEventBuilder;
import org.activiti.engine.impl.cfg.TransactionState;
import org.activiti.engine.impl.context.Context;
import org.activiti.engine.impl.interceptor.Command;
import org.activiti.engine.impl.interceptor.CommandContext;
import org.activiti.engine.impl.jobexecutor.FailedJobListener;
import org.activiti.engine.impl.jobexecutor.JobExecutorContext;
import org.activiti.engine.impl.persistence.entity.JobEntity;
import org.slf4j.Logger;
......@@ -69,44 +65,17 @@ public class ExecuteJobsCmd implements Command<Object>, Serializable {
}
JobExecutorContext jobExecutorContext = Context.getJobExecutorContext();
if (jobExecutorContext != null) { // if null, then we are not called by
// the job executor
if (jobExecutorContext != null) { // if null, then we are not called by the job executor
jobExecutorContext.setCurrentJob(job);
}
FailedJobListener failedJobListener = null;
try {
// When transaction is rolled back, decrement retries
failedJobListener = new FailedJobListener(commandContext.getProcessEngineConfiguration().getCommandExecutor(), jobId);
commandContext.getTransactionContext().addTransactionListener(TransactionState.ROLLED_BACK, failedJobListener);
job.execute(commandContext);
if (commandContext.getEventDispatcher().isEnabled()) {
commandContext.getEventDispatcher().dispatchEvent(ActivitiEventBuilder.createEntityEvent(ActivitiEventType.JOB_EXECUTION_SUCCESS, job));
}
} catch (Throwable exception) {
failedJobListener.setException(exception);
// Dispatch an event, indicating job execution failed in a try-catch
// block, to prevent the original
// exception to be swallowed
if (commandContext.getEventDispatcher().isEnabled()) {
try {
commandContext.getEventDispatcher().dispatchEvent(ActivitiEventBuilder.createEntityExceptionEvent(ActivitiEventType.JOB_EXECUTION_FAILURE, job, exception));
} catch (Throwable ignore) {
log.warn("Exception occured while dispatching job failure event, ignoring.", ignore);
}
}
// Finally, Throw the exception to indicate the ExecuteJobCmd failed
throw new ActivitiException("Job " + jobId + " failed", exception);
} finally {
if (jobExecutorContext != null) {
jobExecutorContext.setCurrentJob(null);
}
}
}
return null;
}
......
......@@ -18,6 +18,8 @@ import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import org.activiti.bpmn.model.FlowElement;
import org.activiti.bpmn.model.ServiceTask;
import org.activiti.engine.ActivitiException;
import org.activiti.engine.ProcessEngineConfiguration;
import org.activiti.engine.delegate.event.ActivitiEventDispatcher;
......@@ -40,6 +42,7 @@ import org.slf4j.LoggerFactory;
/**
* @author Saeid Mirzaei
* @author Joram Barrez
*/
public class JobRetryCmd implements Command<Object> {
......@@ -60,11 +63,19 @@ public class JobRetryCmd implements Command<Object> {
return null;
}
ActivityImpl activity = getCurrentActivity(commandContext, job);
ProcessEngineConfiguration processEngineConfig = commandContext.getProcessEngineConfiguration();
ExecutionEntity executionEntity = fetchExecutionEntity(commandContext, job.getExecutionId());
FlowElement currentFlowElement = executionEntity != null ? executionEntity.getCurrentFlowElement() : null;
String failedJobRetryTimeCycleValue = null;
if (currentFlowElement instanceof ServiceTask) {
failedJobRetryTimeCycleValue = ((ServiceTask) currentFlowElement).getFailedJobRetryTimeCycleValue();
}
if (activity == null || activity.getFailedJobRetryTimeCycleValue() == null) {
log.info("activitiy or FailedJobRetryTimerCycleValue is null in job " + jobId + "'. only decrementing retries.");
if (currentFlowElement == null || failedJobRetryTimeCycleValue == null) {
log.debug("activitiy or FailedJobRetryTimerCycleValue is null in job " + jobId + "'. only decrementing retries.");
job.setRetries(job.getRetries() - 1);
job.setLockOwner(null);
job.setLockExpirationTime(null);
......@@ -77,26 +88,24 @@ public class JobRetryCmd implements Command<Object> {
}
} else {
String failedJobRetryTimeCycle = activity.getFailedJobRetryTimeCycleValue();
try {
DurationHelper durationHelper = new DurationHelper(failedJobRetryTimeCycle, processEngineConfig.getClock());
DurationHelper durationHelper = new DurationHelper(failedJobRetryTimeCycleValue, processEngineConfig.getClock());
job.setLockOwner(null);
job.setLockExpirationTime(null);
job.setDuedate(durationHelper.getDateAfter());
if (job.getExceptionMessage() == null) { // is it the first
// exception
log.debug("Applying JobRetryStrategy '" + failedJobRetryTimeCycle + "' the first time for job " + job.getId() + " with " + durationHelper.getTimes() + " retries");
if (job.getExceptionMessage() == null) { // is it the first exception
log.debug("Applying JobRetryStrategy '" + failedJobRetryTimeCycleValue + "' the first time for job " + job.getId() + " with " + durationHelper.getTimes() + " retries");
// then change default retries to the ones configured
job.setRetries(durationHelper.getTimes());
} else {
log.debug("Decrementing retries of JobRetryStrategy '" + failedJobRetryTimeCycle + "' for job " + job.getId());
log.debug("Decrementing retries of JobRetryStrategy '" + failedJobRetryTimeCycleValue + "' for job " + job.getId());
}
job.setRetries(job.getRetries() - 1);
} catch (Exception e) {
throw new ActivitiException("failedJobRetryTimeCylcle has wrong format:" + failedJobRetryTimeCycle, exception);
throw new ActivitiException("failedJobRetryTimeCylcle has wrong format:" + failedJobRetryTimeCycleValue, exception);
}
}
......@@ -173,6 +182,9 @@ public class JobRetryCmd implements Command<Object> {
}
private ExecutionEntity fetchExecutionEntity(CommandContext commandContext, String executionId) {
if (executionId == null) {
return null;
}
return commandContext.getExecutionEntityManager().findExecutionById(executionId);
}
......
......@@ -27,10 +27,13 @@ import org.activiti.engine.delegate.event.ActivitiEventDispatcher;
import org.activiti.engine.impl.agenda.Agenda;
import org.activiti.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.activiti.engine.impl.cfg.TransactionContext;
import org.activiti.engine.impl.cfg.TransactionState;
import org.activiti.engine.impl.context.Context;
import org.activiti.engine.impl.db.DbSqlSession;
import org.activiti.engine.impl.history.HistoryManager;
import org.activiti.engine.impl.jobexecutor.FailedJobCommandFactory;
import org.activiti.engine.impl.jobexecutor.FailedJobListener;
import org.activiti.engine.impl.jobexecutor.JobExecutorContext;
import org.activiti.engine.impl.persistence.entity.AttachmentEntityManager;
import org.activiti.engine.impl.persistence.entity.ByteArrayEntityManager;
import org.activiti.engine.impl.persistence.entity.CommentEntityManager;
......@@ -48,6 +51,7 @@ import org.activiti.engine.impl.persistence.entity.HistoricTaskInstanceEntityMan
import org.activiti.engine.impl.persistence.entity.HistoricVariableInstanceEntityManager;
import org.activiti.engine.impl.persistence.entity.IdentityInfoEntityManager;
import org.activiti.engine.impl.persistence.entity.IdentityLinkEntityManager;
import org.activiti.engine.impl.persistence.entity.JobEntity;
import org.activiti.engine.impl.persistence.entity.JobEntityManager;
import org.activiti.engine.impl.persistence.entity.MembershipIdentityManager;
import org.activiti.engine.impl.persistence.entity.ModelEntityManager;
......@@ -82,17 +86,10 @@ public class CommandContext {
protected ProcessEngineConfigurationImpl processEngineConfiguration;
protected FailedJobCommandFactory failedJobCommandFactory;
protected List<CommandContextCloseListener> closeListeners;
protected Map<String, Object> attributes; // General-purpose storing of
// anything during the lifetime of
// a command context
protected Map<String, Object> attributes; // General-purpose storing of anything during the lifetime of a command context
protected Agenda agenda = new Agenda(this);
protected Map<String, ExecutionEntity> involvedExecutions = new HashMap<String, ExecutionEntity>(1); // The
// executions
// involved
// with
// the
// command
protected Map<String, ExecutionEntity> involvedExecutions = new HashMap<String, ExecutionEntity>(1); // The executions involved with the command
protected Object result = null;
public void performOperation(AtomicOperation executionOperation, InterpretableExecution execution) {
......@@ -126,10 +123,8 @@ public class CommandContext {
}
public void close() {
// the intention of this method is that all resources are closed
// properly, even
// if exceptions occur in close or flush methods of the sessions or the
// transaction context.
// the intention of this method is that all resources are closed properly, even if exceptions occur
// in close or flush methods of the sessions or the transaction context.
try {
try {
......@@ -240,6 +235,19 @@ public class CommandContext {
public void exception(Throwable exception) {
if (this.exception == null) {
this.exception = exception;
JobExecutorContext jobExecutorContext = Context.getJobExecutorContext();
if (jobExecutorContext != null) {
JobEntity jobEntity = null;
if (jobExecutorContext.getCurrentJob() != null) {
jobEntity = jobExecutorContext.getCurrentJob();
}
FailedJobListener failedJobListener = new FailedJobListener(
getProcessEngineConfiguration().getCommandExecutor(), jobEntity.getId());
failedJobListener.setException(exception);
this.getTransactionContext().addTransactionListener(TransactionState.ROLLED_BACK, failedJobListener);
}
} else {
if (Context.isExecutionContextActive()) {
LogMDC.putMDCExecution(Context.getExecutionContext().getExecution());
......
......@@ -40,9 +40,7 @@ public class CommandContextInterceptor extends AbstractCommandInterceptor {
boolean contextReused = false;
// We need to check the exception, because the transaction can be in a
// rollback state,
// and some other command is being fired to compensate (eg. decrementing
// job retries)
// rollback state, and some other command is being fired to compensate (eg. decrementing job retries)
if (!config.isContextReusePossible() || context == null || context.getException() != null) {
context = commandContextFactory.createCommandContext(command);
} else {
......
......@@ -19,6 +19,7 @@ import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.activiti.engine.ActivitiException;
import org.activiti.engine.ActivitiIllegalArgumentException;
......@@ -163,15 +164,7 @@ public class JobQueryTest extends PluggableActivitiTestCase {
}
public void testQueryByExecutable() {
processEngineConfiguration.getClock().setCurrentTime(new Date(timerThreeFireTime.getTime() + ONE_SECOND)); // all
// jobs
// should
// be
// executable
// at
// t3
// +
// 1hour.1second
processEngineConfiguration.getClock().setCurrentTime(new Date(timerThreeFireTime.getTime() + ONE_SECOND)); // all obs should be executable at t3 + 1hour.1second
JobQuery query = managementService.createJobQuery().executable();
verifyQueryResults(query, 4);
......@@ -242,7 +235,7 @@ public class JobQueryTest extends PluggableActivitiTestCase {
query = managementService.createJobQuery().processInstanceId(processInstance.getId()).withException();
verifyFailedJob(query, processInstance);
}
@Deployment(resources = { "org/activiti/engine/test/api/mgmt/ManagementServiceTest.testGetJobExceptionStacktrace.bpmn20.xml" })
public void testQueryByExceptionMessage() {
JobQuery query = managementService.createJobQuery().exceptionMessage(EXCEPTION_MESSAGE);
......@@ -375,8 +368,7 @@ public class JobQueryTest extends PluggableActivitiTestCase {
// start a process with a failing job
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("exceptionInJobExecution");
// The execution is waiting in the first usertask. This contains a
// boundary
// The execution is waiting in the first usertask. This contains a boundary
// timer event which we will execute manual for testing purposes.
Job timerJob = managementService.createJobQuery().processInstanceId(processInstance.getId()).singleResult();
......@@ -386,7 +378,7 @@ public class JobQueryTest extends PluggableActivitiTestCase {
managementService.executeJob(timerJob.getId());
fail("RuntimeException from within the script task expected");
} catch (RuntimeException re) {
assertTextPresent(EXCEPTION_MESSAGE, re.getCause().getMessage());
assertTextPresent(EXCEPTION_MESSAGE, re.getMessage());
}
return processInstance;
}
......
......@@ -518,10 +518,10 @@ public class MessageBoundaryEventTest extends PluggableActivitiTestCase {
// After setting the clock to time '1 hour and 5 seconds', the timer
// should fire.
processEngineConfiguration.getClock().setCurrentTime(new Date(startTime.getTime() + ((60 * 60 * 1000) + 5000)));
waitForJobExecutorOnCondition(12000L, 100L, new Callable<Boolean>() {
waitForJobExecutorOnCondition(120000000L, 100L, new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return taskService.createTaskQuery().count() == 2;
return taskService.createTaskQuery().count() == 1;
}
});
......
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activiti.engine.test.jobexecutor;
import java.util.Date;
import java.util.concurrent.Callable;
import org.activiti.engine.impl.test.JobTestHelper;
import org.activiti.engine.runtime.JobQuery;
import org.activiti.engine.runtime.ProcessInstance;
import org.activiti.engine.test.Deployment;
import org.activiti6.AbstractActvitiTest;
import org.junit.Assert;
import org.junit.Test;
/**
* @author Joram Barrez
*/
public class JobExecutorExceptionsTest extends AbstractActvitiTest {
@Test
@Deployment(resources = { "org/activiti/engine/test/api/mgmt/ManagementServiceTest.testGetJobExceptionStacktrace.bpmn20.xml" })
public void testQueryByExceptionWithRealJobExecutor() {
JobQuery query = managementService.createJobQuery().withException();
Assert.assertEquals(0, query.count());
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("exceptionInJobExecution");
// Timer is set for 4 hours, so move clock 5 hours
activitiRule.getProcessEngine().getProcessEngineConfiguration().getClock().setCurrentTime(new Date(new Date().getTime() + 5 * 60 * 60 * 1000));
// The execution is waiting in the first usertask. This contains a
// boundary timer event which we will execute manual for testing purposes.
JobTestHelper.waitForJobExecutorOnCondition(activitiRule, 5000000000L, 100L, new Callable<Boolean>() {
public Boolean call() throws Exception {
return managementService.createJobQuery().withException().count() == 1;
}
});
query = managementService.createJobQuery().processInstanceId(processInstance.getId()).withException();
Assert.assertEquals(1, query.count());
}
}
......@@ -68,6 +68,11 @@ public class AbstractActvitiTest {
this.managementService = cachedProcessEngine.getManagementService();
}
@After
public void resetClock() {
activitiRule.getProcessEngine().getProcessEngineConfiguration().getClock().reset();
}
@After
public void logCommandInvokerDebugInfo() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册