提交 b0b267ea 编写于 作者: M martin.grofcik

AsyncRunnableExecutionExceptionHandler refactoring

上级 c18a2024
......@@ -452,7 +452,7 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
protected List<JobHandler> customJobHandlers;
protected Map<String, JobHandler> jobHandlers;
protected AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler;
protected List<AsyncRunnableExecutionExceptionHandler> asyncRunnableExecutionExceptionHandlers;
protected List<HistoryJobHandler> customHistoryJobHandlers;
protected Map<String, HistoryJobHandler> historyJobHandlers;
......@@ -738,7 +738,7 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
// agenda factory
protected FlowableEngineAgendaFactory agendaFactory;
protected DbSchemaManager variableDbSchemaManager;
protected DbSchemaManager taskDbSchemaManager;
......@@ -1307,7 +1307,7 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
this.jobServiceConfiguration.setJobHandlers(this.jobHandlers);
this.jobServiceConfiguration.setHistoryJobHandlers(this.historyJobHandlers);
this.jobServiceConfiguration.setFailedJobCommandFactory(this.failedJobCommandFactory);
this.jobServiceConfiguration.setAsyncRunnableExecutionExceptionHandler(this.asyncRunnableExecutionExceptionHandler);
this.jobServiceConfiguration.setAsyncRunnableExecutionExceptionHandlers(this.asyncRunnableExecutionExceptionHandlers);
this.jobServiceConfiguration.setAsyncExecutorNumberOfRetries(this.asyncExecutorNumberOfRetries);
this.jobServiceConfiguration.setAsyncExecutorResetExpiredJobsMaxTimeout(this.asyncExecutorResetExpiredJobsMaxTimeout);
......@@ -3280,14 +3280,14 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
this.candidateManager = candidateManager;
}
public AsyncRunnableExecutionExceptionHandler getAsyncRunnableExecutionExceptionHandler() {
return asyncRunnableExecutionExceptionHandler;
public List<AsyncRunnableExecutionExceptionHandler> getAsyncRunnableExecutionExceptionHandlers() {
return asyncRunnableExecutionExceptionHandlers;
}
public ProcessEngineConfigurationImpl setAsyncRunnableExecutionExceptionHandler(
AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler) {
public ProcessEngineConfigurationImpl setAsyncRunnableExecutionExceptionHandlers(
List<AsyncRunnableExecutionExceptionHandler> asyncRunnableExecutionExceptionHandlers) {
this.asyncRunnableExecutionExceptionHandler = asyncRunnableExecutionExceptionHandler;
this.asyncRunnableExecutionExceptionHandlers = asyncRunnableExecutionExceptionHandlers;
return this;
}
......@@ -3402,7 +3402,7 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
this.processInstanceStateChangedCallbacks = processInstanceStateChangedCallbacks;
return this;
}
public DbSchemaManager getVariableDbSchemaManager() {
return variableDbSchemaManager;
}
......@@ -3411,7 +3411,7 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
this.variableDbSchemaManager = variableDbSchemaManager;
return this;
}
public DbSchemaManager getTaskDbSchemaManager() {
return taskDbSchemaManager;
}
......
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
......@@ -12,6 +12,8 @@
*/
package org.flowable.job.service;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.flowable.engine.common.AbstractServiceConfiguration;
......@@ -24,6 +26,7 @@ import org.flowable.job.service.impl.JobServiceImpl;
import org.flowable.job.service.impl.TimerJobServiceImpl;
import org.flowable.job.service.impl.asyncexecutor.AsyncExecutor;
import org.flowable.job.service.impl.asyncexecutor.AsyncRunnableExecutionExceptionHandler;
import org.flowable.job.service.impl.asyncexecutor.DefaultAsyncRunnableExecutionExceptionHandler;
import org.flowable.job.service.impl.asyncexecutor.DefaultJobManager;
import org.flowable.job.service.impl.asyncexecutor.FailedJobCommandFactory;
import org.flowable.job.service.impl.asyncexecutor.JobManager;
......@@ -62,16 +65,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public class JobServiceConfiguration extends AbstractServiceConfiguration {
protected static final Logger LOGGER = LoggerFactory.getLogger(JobServiceConfiguration.class);
// SERVICES
// /////////////////////////////////////////////////////////////////
protected JobService jobService = new JobServiceImpl(this);
protected TimerJobService timerJobService = new TimerJobServiceImpl(this);
protected HistoryJobService historyJobService = new HistoryJobServiceImpl(this);
protected JobManager jobManager;
// DATA MANAGERS ///////////////////////////////////////////////////
protected JobDataManager jobDataManager;
......@@ -82,34 +85,35 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
protected JobByteArrayDataManager jobByteArrayDataManager;
// ENTITY MANAGERS /////////////////////////////////////////////////
protected JobEntityManager jobEntityManager;
protected DeadLetterJobEntityManager deadLetterJobEntityManager;
protected SuspendedJobEntityManager suspendedJobEntityManager;
protected TimerJobEntityManager timerJobEntityManager;
protected HistoryJobEntityManager historyJobEntityManager;
protected JobByteArrayEntityManager jobByteArrayEntityManager;
protected CommandExecutor commandExecutor;
protected ExpressionManager expressionManager;
protected BusinessCalendarManager businessCalendarManager;
protected HistoryLevel historyLevel;
protected InternalJobManager internalJobManager;
protected AsyncExecutor asyncExecutor;
protected Map<String, JobHandler> jobHandlers;
protected FailedJobCommandFactory failedJobCommandFactory;
protected AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler;
protected List<AsyncRunnableExecutionExceptionHandler> asyncRunnableExecutionExceptionHandlers =
Collections.<AsyncRunnableExecutionExceptionHandler>singletonList(new DefaultAsyncRunnableExecutionExceptionHandler());;
protected Map<String, HistoryJobHandler> historyJobHandlers;
protected int asyncExecutorNumberOfRetries;
protected int asyncExecutorResetExpiredJobsMaxTimeout;
protected ObjectMapper objectMapper;
// init
......@@ -120,7 +124,7 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
initDataManagers();
initEntityManagers();
}
@Override
public boolean isHistoryLevelAtLeast(HistoryLevel level) {
if (LOGGER.isDebugEnabled()) {
......@@ -137,7 +141,7 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
}
return historyLevel != HistoryLevel.NONE;
}
// Job manager ///////////////////////////////////////////////////////////
public void initJobManager() {
......@@ -199,7 +203,7 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
public JobServiceConfiguration getIdentityLinkServiceConfiguration() {
return this;
}
public JobService getJobService() {
return jobService;
}
......@@ -208,7 +212,7 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
this.jobService = jobService;
return this;
}
public TimerJobService getTimerJobService() {
return timerJobService;
}
......@@ -217,7 +221,7 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
this.timerJobService = timerJobService;
return this;
}
public HistoryJobService getHistoryJobService() {
return historyJobService;
}
......@@ -234,7 +238,7 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
public void setJobManager(JobManager jobManager) {
this.jobManager = jobManager;
}
public JobDataManager getJobDataManager() {
return jobDataManager;
}
......@@ -331,7 +335,7 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
public JobServiceConfiguration setHistoryJobEntityManager(HistoryJobEntityManager historyJobEntityManager) {
this.historyJobEntityManager = historyJobEntityManager;
return this;
return this;
}
public JobByteArrayEntityManager getJobByteArrayEntityManager() {
......@@ -355,13 +359,13 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
public HistoryLevel getHistoryLevel() {
return historyLevel;
}
@Override
public JobServiceConfiguration setHistoryLevel(HistoryLevel historyLevel) {
this.historyLevel = historyLevel;
return this;
}
public InternalJobManager getInternalJobManager() {
return internalJobManager;
}
......@@ -415,12 +419,14 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
return this;
}
public AsyncRunnableExecutionExceptionHandler getAsyncRunnableExecutionExceptionHandler() {
return asyncRunnableExecutionExceptionHandler;
public List<AsyncRunnableExecutionExceptionHandler> getAsyncRunnableExecutionExceptionHandlers() {
return asyncRunnableExecutionExceptionHandlers;
}
public JobServiceConfiguration setAsyncRunnableExecutionExceptionHandler(AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler) {
this.asyncRunnableExecutionExceptionHandler = asyncRunnableExecutionExceptionHandler;
public JobServiceConfiguration setAsyncRunnableExecutionExceptionHandlers(List<AsyncRunnableExecutionExceptionHandler> asyncRunnableExecutionExceptionHandlers) {
if (asyncRunnableExecutionExceptionHandlers != null) {
this.asyncRunnableExecutionExceptionHandlers = asyncRunnableExecutionExceptionHandlers;
}
return this;
}
......
package org.flowable.job.service.impl.asyncexecutor;
import org.flowable.engine.common.api.delegate.event.FlowableEngineEventType;
import org.flowable.engine.common.impl.interceptor.Command;
import org.flowable.engine.common.impl.interceptor.CommandConfig;
import org.flowable.engine.common.impl.interceptor.CommandContext;
import org.flowable.job.service.JobInfo;
import org.flowable.job.service.JobServiceConfiguration;
import org.flowable.job.service.event.impl.FlowableJobEventBuilder;
import org.flowable.job.service.impl.persistence.entity.AbstractRuntimeJobEntity;
import org.flowable.job.service.impl.util.CommandContextUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author martin.grofcik
*/
public class DefaultAsyncRunnableExecutionExceptionHandler implements AsyncRunnableExecutionExceptionHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAsyncRunnableExecutionExceptionHandler.class);
@Override
public boolean handleException(final JobServiceConfiguration jobServiceConfiguration, final JobInfo job, final Throwable exception) {
jobServiceConfiguration.getCommandExecutor().execute(new Command<Void>() {
@Override
public Void execute(CommandContext commandContext) {
// Finally, Throw the exception to indicate the ExecuteAsyncJobCmd failed
String message = "Job " + job.getId() + " failed";
LOGGER.error(message, exception);
if (job instanceof AbstractRuntimeJobEntity) {
AbstractRuntimeJobEntity runtimeJob = (AbstractRuntimeJobEntity) job;
if (runtimeJob.getProcessDefinitionId() != null && jobServiceConfiguration.getInternalJobManager() != null &&
jobServiceConfiguration.getInternalJobManager().isFlowable5ProcessDefinitionId(runtimeJob.getProcessDefinitionId())) {
jobServiceConfiguration.getInternalJobManager().handleFailedJob(runtimeJob, exception);
return null;
}
}
CommandConfig commandConfig = jobServiceConfiguration.getCommandExecutor().getDefaultConfig().transactionRequiresNew();
FailedJobCommandFactory failedJobCommandFactory = jobServiceConfiguration.getFailedJobCommandFactory();
Command<Object> cmd = failedJobCommandFactory.getCommand(job.getId(), exception);
LOGGER.trace("Using FailedJobCommandFactory '{}' and command of type '{}'", failedJobCommandFactory.getClass(), cmd.getClass());
jobServiceConfiguration.getCommandExecutor().execute(commandConfig, cmd);
// Dispatch an event, indicating job execution failed in a
// try-catch block, to prevent the original exception to be swallowed
if (CommandContextUtil.getEventDispatcher().isEnabled()) {
try {
CommandContextUtil.getEventDispatcher().dispatchEvent(FlowableJobEventBuilder.createEntityExceptionEvent(FlowableEngineEventType.JOB_EXECUTION_FAILURE, job, exception));
} catch (Throwable ignore) {
LOGGER.warn("Exception occurred while dispatching job failure event, ignoring.", ignore);
}
}
return null;
}
});
return true;
}
}
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
......@@ -13,15 +13,12 @@
package org.flowable.job.service.impl.asyncexecutor;
import org.flowable.engine.common.api.FlowableOptimisticLockingException;
import org.flowable.engine.common.api.delegate.event.FlowableEngineEventType;
import org.flowable.engine.common.impl.context.Context;
import org.flowable.engine.common.impl.interceptor.Command;
import org.flowable.engine.common.impl.interceptor.CommandConfig;
import org.flowable.engine.common.impl.interceptor.CommandContext;
import org.flowable.job.service.Job;
import org.flowable.job.service.JobInfo;
import org.flowable.job.service.JobServiceConfiguration;
import org.flowable.job.service.event.impl.FlowableJobEventBuilder;
import org.flowable.job.service.impl.cmd.ExecuteAsyncJobCmd;
import org.flowable.job.service.impl.cmd.LockExclusiveJobCmd;
import org.flowable.job.service.impl.cmd.UnlockExclusiveJobCmd;
......@@ -32,6 +29,9 @@ import org.flowable.job.service.impl.util.CommandContextUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* @author Joram Barrez
* @author Tijs Rademakers
......@@ -44,25 +44,35 @@ public class ExecuteAsyncRunnable implements Runnable {
protected JobInfo job;
protected JobServiceConfiguration jobServiceConfiguration;
protected JobInfoEntityManager<? extends JobInfoEntity> jobEntityManager;
protected AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler;
protected List<AsyncRunnableExecutionExceptionHandler> asyncRunnableExecutionExceptionHandlers;
public ExecuteAsyncRunnable(String jobId, JobServiceConfiguration jobServiceConfiguration,
public ExecuteAsyncRunnable(String jobId, JobServiceConfiguration jobServiceConfiguration,
JobInfoEntityManager<? extends JobInfoEntity> jobEntityManager,
AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler) {
this.jobId = jobId;
this.jobServiceConfiguration = jobServiceConfiguration;
this.jobEntityManager = jobEntityManager;
this.asyncRunnableExecutionExceptionHandler = asyncRunnableExecutionExceptionHandler;
initialize(jobId, null, jobServiceConfiguration, jobEntityManager, asyncRunnableExecutionExceptionHandler);
}
public ExecuteAsyncRunnable(JobInfo job, JobServiceConfiguration jobServiceConfiguration,
JobInfoEntityManager<? extends JobInfoEntity> jobEntityManager,
AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler) {
public ExecuteAsyncRunnable(JobInfo job, JobServiceConfiguration jobServiceConfiguration,
JobInfoEntityManager<? extends JobInfoEntity> jobEntityManager,
AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler) {
initialize(job.getId(), job, jobServiceConfiguration, jobEntityManager, asyncRunnableExecutionExceptionHandler);
}
private void initialize(String jobId, JobInfo job, JobServiceConfiguration jobServiceConfiguration, JobInfoEntityManager<? extends JobInfoEntity> jobEntityManager, AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler) {
this.job = job;
this.jobId = job.getId();
this.jobId = jobId;
this.jobServiceConfiguration = jobServiceConfiguration;
this.jobEntityManager = jobEntityManager;
this.asyncRunnableExecutionExceptionHandler = asyncRunnableExecutionExceptionHandler;
this.asyncRunnableExecutionExceptionHandlers = initializeExceptionHandlers(jobServiceConfiguration, asyncRunnableExecutionExceptionHandler);
}
private List<AsyncRunnableExecutionExceptionHandler> initializeExceptionHandlers(JobServiceConfiguration jobServiceConfiguration, AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler) {
List<AsyncRunnableExecutionExceptionHandler> asyncRunnableExecutionExceptionHandlers = new ArrayList<>();
if (asyncRunnableExecutionExceptionHandler != null) {
asyncRunnableExecutionExceptionHandlers.add(asyncRunnableExecutionExceptionHandler);
}
asyncRunnableExecutionExceptionHandlers.addAll(jobServiceConfiguration.getAsyncRunnableExecutionExceptionHandlers());
return asyncRunnableExecutionExceptionHandlers;
}
@Override
......@@ -76,29 +86,29 @@ public class ExecuteAsyncRunnable implements Runnable {
}
});
}
if (job instanceof Job) {
Job jobObject = (Job) job;
if (jobServiceConfiguration.getInternalJobManager() != null &&
if (jobServiceConfiguration.getInternalJobManager() != null &&
jobServiceConfiguration.getInternalJobManager().isFlowable5ProcessDefinitionId(jobObject.getProcessDefinitionId())) {
jobServiceConfiguration.getInternalJobManager().executeV5JobWithLockAndRetry(jobObject);
return;
}
}
if (job instanceof AbstractRuntimeJobEntity) {
boolean lockNotNeededOrSuccess = lockJobIfNeeded();
if (lockNotNeededOrSuccess) {
executeJob();
unlockJobIfNeeded();
}
} else { // history jobs
executeJob();
}
}
......@@ -183,60 +193,14 @@ public class ExecuteAsyncRunnable implements Runnable {
}
protected void handleFailedJob(final Throwable exception) {
AsyncRunnableExecutionExceptionHandler exceptionHandler;
if (asyncRunnableExecutionExceptionHandler != null) {
exceptionHandler = asyncRunnableExecutionExceptionHandler;
} else {
exceptionHandler = jobServiceConfiguration.getAsyncRunnableExecutionExceptionHandler();
}
if (exceptionHandler != null && exceptionHandler.handleException(jobServiceConfiguration, job, exception)) {
return;
}
defaultHandleFailedJob(exception);
}
protected void defaultHandleFailedJob(final Throwable exception) {
jobServiceConfiguration.getCommandExecutor().execute(new Command<Void>() {
@Override
public Void execute(CommandContext commandContext) {
// Finally, Throw the exception to indicate the ExecuteAsyncJobCmd failed
String message = "Job " + jobId + " failed";
LOGGER.error(message, exception);
if (job instanceof AbstractRuntimeJobEntity) {
AbstractRuntimeJobEntity runtimeJob = (AbstractRuntimeJobEntity) job;
if (runtimeJob.getProcessDefinitionId() != null && jobServiceConfiguration.getInternalJobManager() != null &&
jobServiceConfiguration.getInternalJobManager().isFlowable5ProcessDefinitionId(runtimeJob.getProcessDefinitionId())) {
jobServiceConfiguration.getInternalJobManager().handleFailedJob(runtimeJob, exception);
return null;
}
}
CommandConfig commandConfig = jobServiceConfiguration.getCommandExecutor().getDefaultConfig().transactionRequiresNew();
FailedJobCommandFactory failedJobCommandFactory = jobServiceConfiguration.getFailedJobCommandFactory();
Command<Object> cmd = failedJobCommandFactory.getCommand(job.getId(), exception);
LOGGER.trace("Using FailedJobCommandFactory '{}' and command of type '{}'", failedJobCommandFactory.getClass(), cmd.getClass());
jobServiceConfiguration.getCommandExecutor().execute(commandConfig, cmd);
// Dispatch an event, indicating job execution failed in a
// try-catch block, to prevent the original exception to be swallowed
if (CommandContextUtil.getEventDispatcher().isEnabled()) {
try {
CommandContextUtil.getEventDispatcher().dispatchEvent(FlowableJobEventBuilder.createEntityExceptionEvent(FlowableEngineEventType.JOB_EXECUTION_FAILURE, job, exception));
} catch (Throwable ignore) {
LOGGER.warn("Exception occurred while dispatching job failure event, ignoring.", ignore);
}
}
return null;
for (AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler : asyncRunnableExecutionExceptionHandlers) {
if (asyncRunnableExecutionExceptionHandler.handleException(this.jobServiceConfiguration, this.job, exception)) {
return;
}
});
}
LOGGER.error("Unable to handle exception {} for job {}.", exception, job);
throw new RuntimeException("Unable to handle exception "+exception.getMessage()+" for job "+job.getId()+".", exception);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册