提交 75630e1d 编写于 作者: M martin.grofcik 提交者: fheremans

JOB_CANCELED

上级 99381d30
......@@ -61,7 +61,12 @@ public enum ActivitiEventType {
* Timer has been fired successfully.
*/
TIMER_FIRED,
/**
* Timer has been cancelled (e.g. user task on which it was bounded has been completed earlier than expected)
*/
JOB_CANCELED,
/**
* A job has been successfully executed.
*/
......
......@@ -12,25 +12,10 @@
*/
package org.activiti.engine.impl;
import java.sql.Connection;
import java.util.List;
import java.util.Map;
import org.activiti.engine.ActivitiIllegalArgumentException;
import org.activiti.engine.ManagementService;
import org.activiti.engine.event.EventLogEntry;
import org.activiti.engine.impl.cmd.CustomSqlExecution;
import org.activiti.engine.impl.cmd.DeleteEventLogEntry;
import org.activiti.engine.impl.cmd.DeleteJobCmd;
import org.activiti.engine.impl.cmd.ExecuteCustomSqlCmd;
import org.activiti.engine.impl.cmd.ExecuteJobsCmd;
import org.activiti.engine.impl.cmd.GetEventLogEntriesCmd;
import org.activiti.engine.impl.cmd.GetJobExceptionStacktraceCmd;
import org.activiti.engine.impl.cmd.GetPropertiesCmd;
import org.activiti.engine.impl.cmd.GetTableCountCmd;
import org.activiti.engine.impl.cmd.GetTableMetaDataCmd;
import org.activiti.engine.impl.cmd.GetTableNameCmd;
import org.activiti.engine.impl.cmd.SetJobRetriesCmd;
import org.activiti.engine.impl.cmd.*;
import org.activiti.engine.impl.db.DbSqlSession;
import org.activiti.engine.impl.db.DbSqlSessionFactory;
import org.activiti.engine.impl.interceptor.Command;
......@@ -40,6 +25,10 @@ import org.activiti.engine.management.TableMetaData;
import org.activiti.engine.management.TablePageQuery;
import org.activiti.engine.runtime.JobQuery;
import java.sql.Connection;
import java.util.List;
import java.util.Map;
/**
* @author Tom Baeyens
......@@ -66,7 +55,7 @@ public class ManagementServiceImpl extends ServiceImpl implements ManagementServ
}
public void deleteJob(String jobId) {
commandExecutor.execute(new DeleteJobCmd(jobId));
commandExecutor.execute(new CancelJobCmd(jobId));
}
public void setJobRetries(String jobId, int retries) {
......
......@@ -13,20 +13,26 @@
package org.activiti.engine.impl.bpmn.behavior;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.activiti.engine.ActivitiException;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.delegate.event.impl.ActivitiEventBuilder;
import org.activiti.engine.impl.Condition;
import org.activiti.engine.impl.bpmn.parser.BpmnParse;
import org.activiti.engine.impl.context.Context;
import org.activiti.engine.impl.persistence.entity.ExecutionEntity;
import org.activiti.engine.impl.persistence.entity.JobEntity;
import org.activiti.engine.impl.pvm.PvmTransition;
import org.activiti.engine.impl.pvm.delegate.ActivityExecution;
import org.activiti.engine.impl.pvm.process.ActivityImpl;
import org.activiti.engine.impl.pvm.runtime.InterpretableExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Helper class for implementing BPMN 2.0 activities, offering convenience
* methods specific to BPMN 2.0.
......@@ -50,8 +56,12 @@ public class BpmnActivityBehavior implements Serializable {
* the process instance. If multiple sequencer flow are selected, multiple,
* parallel paths of executions are created.
*/
public void performDefaultOutgoingBehavior(ActivityExecution activityExceution) {
performOutgoingBehavior(activityExceution, true, false, null);
public void performDefaultOutgoingBehavior(ActivityExecution activityExecution) {
ActivityImpl activity = (ActivityImpl) activityExecution.getActivity();
if (!(activity.getActivityBehavior() instanceof IntermediateCatchEventActivityBehavior)) {
dispatchJobCanceledEvents(activityExecution);
}
performOutgoingBehavior(activityExecution, true, false, null);
}
/**
......@@ -68,6 +78,22 @@ public class BpmnActivityBehavior implements Serializable {
performOutgoingBehavior(activityExecution, false, false, null);
}
/**
* dispatch job canceled event for job associated with given execution entity
* @param activityExecution
*/
protected void dispatchJobCanceledEvents(ActivityExecution activityExecution) {
if (activityExecution instanceof ExecutionEntity) {
List<JobEntity> jobs = ((ExecutionEntity) activityExecution).getJobs();
for (JobEntity job: jobs) {
if (Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
ActivitiEventBuilder.createEntityEvent(ActivitiEventType.JOB_CANCELED, job));
}
}
}
}
/**
* Actual implementation of leaving an activity.
*
......
......@@ -29,7 +29,7 @@ import org.activiti.engine.impl.bpmn.parser.BpmnParser;
import org.activiti.engine.impl.bpmn.parser.EventSubscriptionDeclaration;
import org.activiti.engine.impl.cfg.IdGenerator;
import org.activiti.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.activiti.engine.impl.cmd.DeleteJobsCmd;
import org.activiti.engine.impl.cmd.CancelJobsCmd;
import org.activiti.engine.impl.cmd.DeploymentSettings;
import org.activiti.engine.impl.context.Context;
import org.activiti.engine.impl.db.DbSqlSession;
......@@ -277,7 +277,7 @@ public class BpmnDeployer implements Deployer {
.findJobsByConfiguration(TimerStartEventJobHandler.TYPE, processDefinition.getKey());
for (Job job :jobsToDelete) {
new DeleteJobsCmd(job.getId()).execute(Context.getCommandContext());
new CancelJobsCmd(job.getId()).execute(Context.getCommandContext());
}
}
......
package org.activiti.engine.impl.cmd;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.delegate.event.impl.ActivitiEventBuilder;
import org.activiti.engine.impl.context.Context;
import org.activiti.engine.impl.interceptor.CommandContext;
import org.activiti.engine.impl.persistence.entity.JobEntity;
/**
* This class sends JOB_CANCELLED event and deletes job
*/
public class CancelJobCmd extends DeleteJobCmd {
public CancelJobCmd(String jobId) {
super(jobId);
}
@Override
public Object execute(CommandContext commandContext) {
JobEntity jobToDelete = getJobToDelete(commandContext);
sendCancelEvent(jobToDelete);
jobToDelete.delete();
return null;
}
private void sendCancelEvent(JobEntity jobToDelete) {
if (Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
ActivitiEventBuilder.createEntityEvent(ActivitiEventType.JOB_CANCELED, jobToDelete));
}
}
}
......@@ -17,24 +17,28 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.delegate.event.impl.ActivitiEventBuilder;
import org.activiti.engine.impl.interceptor.Command;
import org.activiti.engine.impl.interceptor.CommandContext;
import org.activiti.engine.impl.persistence.entity.JobEntity;
/**
* Send job cancelled event and delete job
*
* @author Tom Baeyens
*/
public class DeleteJobsCmd implements Command<Void>, Serializable {
public class CancelJobsCmd implements Command<Void>, Serializable {
private static final long serialVersionUID = 1L;
List<String> jobIds;
public DeleteJobsCmd(List<String> jobIds) {
public CancelJobsCmd(List<String> jobIds) {
this.jobIds = jobIds;
}
public DeleteJobsCmd(String jobId) {
public CancelJobsCmd(String jobId) {
this.jobIds = new ArrayList<String>();
jobIds.add(jobId);
}
......@@ -48,6 +52,11 @@ public class DeleteJobsCmd implements Command<Void>, Serializable {
if(jobToDelete != null) {
// When given job doesn't exist, ignore
if (commandContext.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
commandContext.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
ActivitiEventBuilder.createEntityEvent(ActivitiEventType.JOB_CANCELED, jobToDelete));
}
jobToDelete.delete();
}
}
......
......@@ -29,6 +29,13 @@ public class DeleteJobCmd implements Command<Object>, Serializable {
}
public Object execute(CommandContext commandContext) {
JobEntity jobToDelete = getJobToDelete(commandContext);
jobToDelete.delete();
return null;
}
protected JobEntity getJobToDelete(CommandContext commandContext) {
if (jobId == null) {
throw new ActivitiIllegalArgumentException("jobId is null");
}
......@@ -40,7 +47,7 @@ public class DeleteJobCmd implements Command<Object>, Serializable {
if (job == null) {
throw new ActivitiObjectNotFoundException("No job found with id '" + jobId + "'", Job.class);
}
// We need to check if the job was locked, ie acquired by the job acquisition thread
// This happens if the the job was already acquired, but not yet executed.
// In that case, we can't allow to delete the job.
......@@ -48,9 +55,7 @@ public class DeleteJobCmd implements Command<Object>, Serializable {
{
throw new ActivitiException("Cannot delete job when the job is being executed. Try again later.");
}
job.delete();
return null;
return job;
}
}
......@@ -16,6 +16,8 @@ package org.activiti.engine.impl.persistence.entity;
import java.util.List;
import java.util.Map;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.delegate.event.impl.ActivitiEventBuilder;
import org.activiti.engine.impl.DeploymentQueryImpl;
import org.activiti.engine.impl.Page;
import org.activiti.engine.impl.ProcessDefinitionQueryImpl;
......@@ -109,6 +111,11 @@ public class DeploymentEntityManager extends AbstractManager {
if (nrOfVersions - nrOfProcessDefinitionsWithSameKey <= 1) {
for (Job job : timerStartJobs) {
if (Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
ActivitiEventBuilder.createEntityEvent(ActivitiEventType.JOB_CANCELED, job, null, null, processDefinition.getId()));
}
((JobEntity)job).delete();
}
}
......
......@@ -19,6 +19,8 @@ import java.util.List;
import java.util.Map;
import org.activiti.engine.ActivitiIllegalArgumentException;
import org.activiti.engine.delegate.event.ActivitiEventType;
import org.activiti.engine.delegate.event.impl.ActivitiEventBuilder;
import org.activiti.engine.impl.JobQueryImpl;
import org.activiti.engine.impl.Page;
import org.activiti.engine.impl.cfg.TransactionListener;
......@@ -77,6 +79,10 @@ public class JobEntityManager extends AbstractManager {
.findTimersByExecutionId(execution.getId());
for (TimerEntity timer: timers) {
if (Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
ActivitiEventBuilder.createEntityEvent(ActivitiEventType.JOB_CANCELED, timer));
}
timer.delete();
}
}
......
......@@ -12,10 +12,6 @@
*/
package org.activiti.engine.test.api.event;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List;
import org.activiti.engine.delegate.event.ActivitiEntityEvent;
import org.activiti.engine.delegate.event.ActivitiEvent;
import org.activiti.engine.delegate.event.ActivitiEventType;
......@@ -24,8 +20,14 @@ import org.activiti.engine.impl.util.DefaultClockImpl;
import org.activiti.engine.runtime.Clock;
import org.activiti.engine.runtime.Job;
import org.activiti.engine.runtime.ProcessInstance;
import org.activiti.engine.task.Task;
import org.activiti.engine.test.Deployment;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
/**
* Test case for all {@link ActivitiEvent}s related to jobs.
*
......@@ -91,20 +93,19 @@ public class JobEventsTest extends PluggableActivitiTestCase {
assertEquals(ActivitiEventType.JOB_EXECUTION_SUCCESS, event.getType());
checkEventContext(event, theJob, true);
}
/**
/**
* Timer repetition
*/
@Deployment
public void testRepetitionJobEntityEvents() throws Exception {
Clock previousClock = processEngineConfiguration.getClock();
Calendar testCal = new GregorianCalendar();
Clock testClock = new DefaultClockImpl();
processEngineConfiguration.setClock(testClock);
testClock.setCurrentTime(testCal.getTime());
testClock.setCurrentTime(new Date(0));
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("testRepetitionJobEvents");
Job theJob = managementService.createJobQuery().processInstanceId(processInstance.getId()).singleResult();
assertNotNull(theJob);
......@@ -122,17 +123,16 @@ public class JobEventsTest extends PluggableActivitiTestCase {
listener.clearEventsReceived();
// fire timer for the first time
testCal.add(Calendar.SECOND, 20);
testClock.setCurrentTime(testCal.getTime());
waitForJobExecutorToProcessAllJobs(5000, 200);
testClock.setCurrentTime(new Date(10000));
waitForJobExecutorToProcessAllJobs(20000, 100);
// fire timer for the second time
testCal.add(Calendar.SECOND, 20);
testClock.setCurrentTime(testCal.getTime());
waitForJobExecutorToProcessAllJobs(5000, 200);
testClock.setCurrentTime(new Date(20000));
waitForJobExecutorToProcessAllJobs(20000, 100);
// all jobs should have been done now
assertEquals(0, managementService.createJobQuery().count());
// do not fire timer
testClock.setCurrentTime(new Date(30000));
waitForJobExecutorToProcessAllJobs(20000, 100);
// count timer fired events
int timerFiredCount = 0;
......@@ -148,48 +148,106 @@ public class JobEventsTest extends PluggableActivitiTestCase {
assertEquals(2, timerFiredCount);
}
/**
* Test TIMER_FIRED event for timer start bpmn event.
*/
@Deployment
public void testTimerFiredForTimerStart() throws Exception {
// there should be one job after process definition deployment
// Force timer to start the process
Calendar tomorrow = Calendar.getInstance();
tomorrow.add(Calendar.DAY_OF_YEAR, 1);
processEngineConfiguration.getClock().setCurrentTime(tomorrow.getTime());
waitForJobExecutorToProcessAllJobs(2000, 100);
// Check Timer fired event has been dispatched
assertEquals(3, listener.getEventsReceived().size());
assertEquals(ActivitiEventType.TIMER_FIRED, listener.getEventsReceived().get(0).getType());
public void testJobCanceledEventOnBoundaryEvent() throws Exception {
Clock previousClock = processEngineConfiguration.getClock();
Clock testClock = new DefaultClockImpl();
processEngineConfiguration.setClock(testClock);
testClock.setCurrentTime(new Date(0));
runtimeService.startProcessInstanceByKey("testTimerCancelledEvent");
listener.clearEventsReceived();
Task task = taskService.createTaskQuery().singleResult();
taskService.complete(task.getId());
checkEventCount(1, ActivitiEventType.JOB_CANCELED);
}
/**
* Test TIMER_FIRED event for intermediate timer bpmn event.
*/
@Deployment
public void testTimerFiredForIntermediateTimer() throws Exception {
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("testTimerFiredForIntermediateTimer");
// Force timer to start the process
Calendar tomorrow = Calendar.getInstance();
tomorrow.add(Calendar.DAY_OF_YEAR, 1);
processEngineConfiguration.getClock().setCurrentTime(tomorrow.getTime());
waitForJobExecutorToProcessAllJobs(2000, 100);
// Check Timer fired event has been dispatched
// there is an issue (ENTITY_DELETED for job is generated twice)
boolean timerFired = false;
for(ActivitiEvent event : listener.getEventsReceived()) {
if (ActivitiEventType.TIMER_FIRED.equals(event.getType())) {
timerFired = true;
@Deployment(resources = "org/activiti/engine/test/api/event/JobEventsTest.testJobCanceledEventOnBoundaryEvent.bpmn20.xml")
public void testJobCanceledEventByManagementService() throws Exception {
// GIVEN
processEngineConfiguration.getClock().setCurrentTime(new Date(0));
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("testTimerCancelledEvent");
listener.clearEventsReceived();
Job job = managementService.createJobQuery().processInstanceId(processInstance.getId()).singleResult();
// WHEN
managementService.deleteJob(job.getId());
// THEN
checkEventCount(1, ActivitiEventType.JOB_CANCELED);
}
public void testJobCanceledEventOnProcessRedeploy() throws Exception {
// GIVEN
// deploy process definition
String deployment1 = repositoryService.createDeployment().addClasspathResource("org/activiti/engine/test/api/event/JobEventsTest.testTimerFiredForTimerStart.bpmn20.xml").deploy().getId();
listener.clearEventsReceived();
// WHEN
String deployment2 = repositoryService.createDeployment().addClasspathResource("org/activiti/engine/test/api/event/JobEventsTest.testTimerFiredForTimerStart.bpmn20.xml").deploy().getId();
// THEN
checkEventCount(1, ActivitiEventType.JOB_CANCELED);
repositoryService.deleteDeployment(deployment2);
repositoryService.deleteDeployment(deployment1);
}
private void checkEventCount(int expectedCount, ActivitiEventType eventType) {// count timer cancelled events
int timerCancelledCount = 0;
List<ActivitiEvent> eventsReceived = listener.getEventsReceived();
for (ActivitiEvent eventReceived : eventsReceived) {
if (eventType.equals(eventReceived.getType())) {
timerCancelledCount++;
}
}
assertTrue(timerFired);
assertEquals(eventType.name() + " event was expected "+ expectedCount+" times.", expectedCount, timerCancelledCount);
}
/**
/**
* Test TIMER_FIRED event for timer start bpmn event.
*/
@Deployment
public void testTimerFiredForTimerStart() throws Exception {
// there should be one job after process definition deployment
// Force timer to start the process
Calendar tomorrow = Calendar.getInstance();
tomorrow.add(Calendar.DAY_OF_YEAR, 1);
processEngineConfiguration.getClock().setCurrentTime(tomorrow.getTime());
waitForJobExecutorToProcessAllJobs(2000, 100);
// Check Timer fired event has been dispatched
assertEquals(3, listener.getEventsReceived().size());
assertEquals(ActivitiEventType.TIMER_FIRED, listener.getEventsReceived().get(0).getType());
checkEventCount(0, ActivitiEventType.JOB_CANCELED);
}
/**
* Test TIMER_FIRED event for intermediate timer bpmn event.
*/
@Deployment
public void testTimerFiredForIntermediateTimer() throws Exception {
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("testTimerFiredForIntermediateTimer");
// Force timer to start the process
Calendar tomorrow = Calendar.getInstance();
tomorrow.add(Calendar.DAY_OF_YEAR, 1);
processEngineConfiguration.getClock().setCurrentTime(tomorrow.getTime());
waitForJobExecutorToProcessAllJobs(2000, 100);
checkEventCount(0, ActivitiEventType.JOB_CANCELED);
checkEventCount(1, ActivitiEventType.TIMER_FIRED);
}
/**
* Test create, update and delete events of jobs entities.
*/
......
......@@ -22,7 +22,7 @@ import java.util.UUID;
import org.activiti.engine.ActivitiException;
import org.activiti.engine.ActivitiIllegalArgumentException;
import org.activiti.engine.impl.cmd.DeleteJobsCmd;
import org.activiti.engine.impl.cmd.CancelJobsCmd;
import org.activiti.engine.impl.interceptor.Command;
import org.activiti.engine.impl.interceptor.CommandContext;
import org.activiti.engine.impl.interceptor.CommandExecutor;
......@@ -114,7 +114,7 @@ public class JobQueryTest extends PluggableActivitiTestCase {
@Override
protected void tearDown() throws Exception {
repositoryService.deleteDeployment(deploymentId, true);
commandExecutor.execute(new DeleteJobsCmd(messageId));
commandExecutor.execute(new CancelJobsCmd(messageId));
super.tearDown();
}
......
......@@ -19,7 +19,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import org.activiti.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.activiti.engine.impl.cmd.DeleteJobsCmd;
import org.activiti.engine.impl.cmd.CancelJobsCmd;
import org.activiti.engine.impl.interceptor.CommandExecutor;
import org.activiti.engine.impl.test.PluggableActivitiTestCase;
import org.activiti.engine.impl.util.IoUtil;
......@@ -233,7 +233,7 @@ public class StartTimerEventTest extends PluggableActivitiTestCase {
private void cleanDB() {
String jobId = managementService.createJobQuery().singleResult().getId();
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutor();
commandExecutor.execute(new DeleteJobsCmd(jobId));
commandExecutor.execute(new CancelJobsCmd(jobId));
}
}
......@@ -3,6 +3,7 @@
*/
package org.activiti.engine.test.jobexecutor;
import org.activiti.engine.impl.cmd.CancelJobsCmd;
import org.activiti.engine.impl.interceptor.Command;
import org.activiti.engine.impl.interceptor.CommandContext;
import org.activiti.engine.impl.interceptor.CommandExecutor;
......@@ -67,7 +68,7 @@ public class JobExecutorCmdExceptionTest extends PluggableActivitiTestCase {
public void testJobCommandsWith3Exceptions() {
tweetExceptionHandler.setExceptionsRemaining(3);
commandExecutor.execute(new Command<String>() {
String jobId = commandExecutor.execute(new Command<String>() {
public String execute(CommandContext commandContext) {
MessageEntity message = createTweetExceptionMessage();
......
<?xml version="1.0" encoding="UTF-8"?>
<definitions
xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
targetNamespace="Examples">
<process id="testTimerCancelledEvent">
<startEvent id="theStart" />
<sequenceFlow sourceRef="theStart" targetRef="task" />
<userTask id="task" />
<boundaryEvent attachedToRef="task" id="timer" cancelActivity="false">
<timerEventDefinition>
<timeDuration>PT10H</timeDuration>
</timerEventDefinition>
</boundaryEvent>
<sequenceFlow sourceRef="task" targetRef="theEnd" />
<endEvent id="theEnd" />
</process>
</definitions>
......@@ -1087,6 +1087,16 @@ in your subclass, BPMN-event throwing can be prevented. The classes involved are
<entry>A timer has been fired. The event contains the job that was executed?</entry>
<entry><literal>org.activiti...ActivitiEntityEvent</literal></entry>
</row>
<row>
<entry>JOB_CANCELED</entry>
<entry>A job has been canceled. The event contains the job that was canceled. Job can be canceled by
API call, task was completed and associated boundary timer was canceled, on the new process definition
deployment.
</entry>
<entry>
<literal>org.activiti...ActivitiEntityEvent</literal>
</entry>
</row>
<row>
<entry>ACTIVITY_STARTED</entry>
<entry>An activity is starting to execute</entry>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册