提交 ff016cde 编写于 作者: R roman.smirnov

ACT-1046: - Added an interface "FailedJobCommandFactory" and an implementation...

ACT-1046: - Added an interface "FailedJobCommandFactory" and an implementation of it "DefaultFailedJobCommandFactory", renamed "DecrementJobRetriesCmd" to "FailedJobListener" to able to configure a delay between retries and amount of retries;
- Added "parseIntermediateCatchEvent()" and "parseBoundaryEvent()" in BPMNParseListener to allow some extensions to these elements (e.g. specify a specific failed job retry logic)
上级 ad4fd13b
......@@ -203,7 +203,13 @@ public class CdiEventSupportBpmnParseListener implements BpmnParseListener {
addStartEventListener(activity);
addEndEventListener(activity);
}
@Override
public void parseIntermediateCatchEvent(Element intermediateEventElement, ScopeImpl scope, ActivityImpl activity) {
}
@Override
public void parseBoundaryEvent(Element boundaryEventElement, ScopeImpl scopeElement, ActivityImpl nestedActivity) {
}
}
......@@ -104,4 +104,10 @@ public class AbstractBpmnParseListener implements BpmnParseListener {
public void parseIntermediateThrowEvent(Element intermediateEventElement, ScopeImpl scope, ActivityImpl activity) {
}
public void parseIntermediateCatchEvent(Element intermediateEventElement, ScopeImpl scope, ActivityImpl activity) {
}
public void parseBoundaryEvent(Element boundaryEventElement, ScopeImpl scopeElement, ActivityImpl nestedActivity) {
}
}
......@@ -976,6 +976,10 @@ public class BpmnParse extends Parse {
addError("Unsupported intermediate catch event type", intermediateEventElement);
}
for (BpmnParseListener parseListener : parseListeners) {
parseListener.parseIntermediateCatchEvent(intermediateEventElement, scopeElement, nestedActivity);
}
parseExecutionListenersOnScope(intermediateEventElement, nestedActivity);
return nestedActivity;
......@@ -2153,6 +2157,10 @@ public class BpmnParse extends Parse {
} else {
addError("Unsupported boundary event type", boundaryEventElement);
}
for (BpmnParseListener parseListener : parseListeners) {
parseListener.parseBoundaryEvent(boundaryEventElement, scopeElement, nestedActivity);
}
nestedActivity.setActivityBehavior(behavior);
}
......
......@@ -63,5 +63,7 @@ public interface BpmnParseListener {
void parseTransaction(Element transactionElement, ScopeImpl scope, ActivityImpl activity);
void parseCompensateEventDefinition(Element compensateEventDefinition, ActivityImpl compensationActivity);
void parseIntermediateThrowEvent(Element intermediateEventElement, ScopeImpl scope, ActivityImpl activity);
void parseIntermediateCatchEvent(Element intermediateEventElement, ScopeImpl scope, ActivityImpl activity);
void parseBoundaryEvent(Element boundaryEventElement, ScopeImpl scopeElement, ActivityImpl nestedActivity);
}
......@@ -85,7 +85,9 @@ import org.activiti.engine.impl.interceptor.DelegateInterceptor;
import org.activiti.engine.impl.interceptor.SessionFactory;
import org.activiti.engine.impl.jobexecutor.AsyncContinuationJobHandler;
import org.activiti.engine.impl.jobexecutor.CallerRunsRejectedJobsHandler;
import org.activiti.engine.impl.jobexecutor.DefaultFailedJobCommandFactory;
import org.activiti.engine.impl.jobexecutor.DefaultJobExecutor;
import org.activiti.engine.impl.jobexecutor.FailedJobCommandFactory;
import org.activiti.engine.impl.jobexecutor.JobExecutor;
import org.activiti.engine.impl.jobexecutor.JobHandler;
import org.activiti.engine.impl.jobexecutor.ProcessEventJobHandler;
......@@ -275,6 +277,8 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
protected Map<String, EventHandler> eventHandlers;
protected List<EventHandler> customEventHandlers;
protected FailedJobCommandFactory failedJobCommandFactory;
// buildProcessEngine ///////////////////////////////////////////////////////
......@@ -309,6 +313,15 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
initJpa();
initDelegateInterceptor();
initEventHandlers();
initFailedJobCommandFactory();
}
// failedJobCommandFactory ////////////////////////////////////////////////////////
protected void initFailedJobCommandFactory() {
if (failedJobCommandFactory == null) {
failedJobCommandFactory = new DefaultFailedJobCommandFactory();
}
}
// command executors ////////////////////////////////////////////////////////
......@@ -1570,5 +1583,14 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
public void setCustomEventHandlers(List<EventHandler> customEventHandlers) {
this.customEventHandlers = customEventHandlers;
}
public FailedJobCommandFactory getFailedJobCommandFactory() {
return failedJobCommandFactory;
}
public ProcessEngineConfigurationImpl setFailedJobCommandFactory(FailedJobCommandFactory failedJobCommandFactory) {
this.failedJobCommandFactory = failedJobCommandFactory;
return this;
}
}
\ No newline at end of file
......@@ -22,7 +22,7 @@ import org.activiti.engine.impl.context.Context;
import org.activiti.engine.impl.interceptor.Command;
import org.activiti.engine.impl.interceptor.CommandContext;
import org.activiti.engine.impl.interceptor.CommandExecutor;
import org.activiti.engine.impl.jobexecutor.DecrementJobRetriesListener;
import org.activiti.engine.impl.jobexecutor.FailedJobListener;
import org.activiti.engine.impl.jobexecutor.JobExecutorContext;
import org.activiti.engine.impl.persistence.entity.JobEntity;
......@@ -73,7 +73,7 @@ public class ExecuteJobsCmd implements Command<Object>, Serializable {
commandContext.getTransactionContext().addTransactionListener(
TransactionState.ROLLED_BACK,
new DecrementJobRetriesListener(commandExecutor, jobId, exception));
new FailedJobListener(commandExecutor, jobId, exception));
// throw the original exception to indicate the ExecuteJobCmd failed
throw exception;
......
......@@ -181,4 +181,10 @@ public class HistoryParseListener implements BpmnParseListener {
public void parseIntermediateThrowEvent(Element intermediateEventElement, ScopeImpl scope, ActivityImpl activity) {
}
public void parseIntermediateCatchEvent(Element intermediateEventElement, ScopeImpl scope, ActivityImpl activity) {
}
public void parseBoundaryEvent(Element boundaryEventElement, ScopeImpl scopeElement, ActivityImpl nestedActivity) {
}
}
package org.activiti.engine.impl.jobexecutor;
import org.activiti.engine.impl.cmd.DecrementJobRetriesCmd;
import org.activiti.engine.impl.interceptor.Command;
public class DefaultFailedJobCommandFactory implements FailedJobCommandFactory {
public Command<Object> getCommand(String jobId, Throwable exception) {
return new DecrementJobRetriesCmd(jobId, exception);
}
}
package org.activiti.engine.impl.jobexecutor;
import org.activiti.engine.impl.interceptor.Command;
public interface FailedJobCommandFactory {
public Command<Object> getCommand(String jobId, Throwable exception);
}
......@@ -14,7 +14,7 @@
package org.activiti.engine.impl.jobexecutor;
import org.activiti.engine.impl.cfg.TransactionListener;
import org.activiti.engine.impl.cmd.DecrementJobRetriesCmd;
import org.activiti.engine.impl.context.Context;
import org.activiti.engine.impl.interceptor.CommandContext;
import org.activiti.engine.impl.interceptor.CommandExecutor;
......@@ -22,20 +22,22 @@ import org.activiti.engine.impl.interceptor.CommandExecutor;
/**
* @author Frederik Heremans
*/
public class DecrementJobRetriesListener implements TransactionListener {
public class FailedJobListener implements TransactionListener {
protected CommandExecutor commandExecutor;
protected String jobId;
protected Throwable exception;
public DecrementJobRetriesListener(CommandExecutor commandExecutor, String jobId, Throwable exception) {
public FailedJobListener(CommandExecutor commandExecutor, String jobId, Throwable exception) {
this.commandExecutor = commandExecutor;
this.jobId = jobId;
this.exception = exception;
}
public void execute(CommandContext commandContext) {
commandExecutor.execute(new DecrementJobRetriesCmd(jobId, exception));
commandExecutor.execute(Context.getProcessEngineConfiguration()
.getFailedJobCommandFactory()
.getCommand(jobId, exception));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册