提交 e4c4d0a6 编写于 作者: T Tijs Rademakers

Small changes to PR #594

......@@ -283,6 +283,7 @@ import org.flowable.job.service.impl.asyncexecutor.AsyncExecutor;
import org.flowable.job.service.impl.asyncexecutor.AsyncRunnableExecutionExceptionHandler;
import org.flowable.job.service.impl.asyncexecutor.DefaultAsyncHistoryJobExecutor;
import org.flowable.job.service.impl.asyncexecutor.DefaultAsyncJobExecutor;
import org.flowable.job.service.impl.asyncexecutor.DefaultAsyncRunnableExecutionExceptionHandler;
import org.flowable.job.service.impl.asyncexecutor.ExecuteAsyncRunnableFactory;
import org.flowable.job.service.impl.asyncexecutor.FailedJobCommandFactory;
import org.flowable.job.service.impl.asyncexecutor.JobManager;
......@@ -454,7 +455,8 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
protected List<JobHandler> customJobHandlers;
protected Map<String, JobHandler> jobHandlers;
protected AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler;
protected List<AsyncRunnableExecutionExceptionHandler> customAsyncRunnableExecutionExceptionHandlers;
protected boolean addDefaultExceptionHandler = true;
protected List<HistoryJobHandler> customHistoryJobHandlers;
protected Map<String, HistoryJobHandler> historyJobHandlers;
......@@ -1325,7 +1327,17 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
this.jobServiceConfiguration.setJobHandlers(this.jobHandlers);
this.jobServiceConfiguration.setHistoryJobHandlers(this.historyJobHandlers);
this.jobServiceConfiguration.setFailedJobCommandFactory(this.failedJobCommandFactory);
this.jobServiceConfiguration.setAsyncRunnableExecutionExceptionHandler(this.asyncRunnableExecutionExceptionHandler);
List<AsyncRunnableExecutionExceptionHandler> exceptionHandlers = new ArrayList<>();
if (customAsyncRunnableExecutionExceptionHandlers != null) {
exceptionHandlers.addAll(customAsyncRunnableExecutionExceptionHandlers);
}
if (addDefaultExceptionHandler) {
exceptionHandlers.add(new DefaultAsyncRunnableExecutionExceptionHandler());
}
this.jobServiceConfiguration.setAsyncRunnableExecutionExceptionHandlers(exceptionHandlers);
this.jobServiceConfiguration.setAsyncExecutorNumberOfRetries(this.asyncExecutorNumberOfRetries);
this.jobServiceConfiguration.setAsyncExecutorResetExpiredJobsMaxTimeout(this.asyncExecutorResetExpiredJobsMaxTimeout);
......@@ -3298,14 +3310,23 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
this.candidateManager = candidateManager;
}
public AsyncRunnableExecutionExceptionHandler getAsyncRunnableExecutionExceptionHandler() {
return asyncRunnableExecutionExceptionHandler;
public List<AsyncRunnableExecutionExceptionHandler> getCustomAsyncRunnableExecutionExceptionHandlers() {
return customAsyncRunnableExecutionExceptionHandlers;
}
public ProcessEngineConfigurationImpl setAsyncRunnableExecutionExceptionHandler(
AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler) {
public ProcessEngineConfigurationImpl setCustomAsyncRunnableExecutionExceptionHandlers(
List<AsyncRunnableExecutionExceptionHandler> customAsyncRunnableExecutionExceptionHandlers) {
this.asyncRunnableExecutionExceptionHandler = asyncRunnableExecutionExceptionHandler;
this.customAsyncRunnableExecutionExceptionHandlers = customAsyncRunnableExecutionExceptionHandlers;
return this;
}
public boolean isAddDefaultExceptionHandler() {
return addDefaultExceptionHandler;
}
public ProcessEngineConfigurationImpl setAddDefaultExceptionHandler(boolean addDefaultExceptionHandler) {
this.addDefaultExceptionHandler = addDefaultExceptionHandler;
return this;
}
......@@ -3420,7 +3441,7 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
this.processInstanceStateChangedCallbacks = processInstanceStateChangedCallbacks;
return this;
}
public DbSchemaManager getVariableDbSchemaManager() {
return variableDbSchemaManager;
}
......@@ -3429,7 +3450,7 @@ public abstract class ProcessEngineConfigurationImpl extends ProcessEngineConfig
this.variableDbSchemaManager = variableDbSchemaManager;
return this;
}
public DbSchemaManager getTaskDbSchemaManager() {
return taskDbSchemaManager;
}
......
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
......@@ -12,6 +12,7 @@
*/
package org.flowable.job.service;
import java.util.List;
import java.util.Map;
import org.flowable.engine.common.AbstractServiceConfiguration;
......@@ -62,16 +63,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public class JobServiceConfiguration extends AbstractServiceConfiguration {
protected static final Logger LOGGER = LoggerFactory.getLogger(JobServiceConfiguration.class);
// SERVICES
// /////////////////////////////////////////////////////////////////
protected JobService jobService = new JobServiceImpl(this);
protected TimerJobService timerJobService = new TimerJobServiceImpl(this);
protected HistoryJobService historyJobService = new HistoryJobServiceImpl(this);
protected JobManager jobManager;
// DATA MANAGERS ///////////////////////////////////////////////////
protected JobDataManager jobDataManager;
......@@ -82,34 +83,34 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
protected JobByteArrayDataManager jobByteArrayDataManager;
// ENTITY MANAGERS /////////////////////////////////////////////////
protected JobEntityManager jobEntityManager;
protected DeadLetterJobEntityManager deadLetterJobEntityManager;
protected SuspendedJobEntityManager suspendedJobEntityManager;
protected TimerJobEntityManager timerJobEntityManager;
protected HistoryJobEntityManager historyJobEntityManager;
protected JobByteArrayEntityManager jobByteArrayEntityManager;
protected CommandExecutor commandExecutor;
protected ExpressionManager expressionManager;
protected BusinessCalendarManager businessCalendarManager;
protected HistoryLevel historyLevel;
protected InternalJobManager internalJobManager;
protected AsyncExecutor asyncExecutor;
protected Map<String, JobHandler> jobHandlers;
protected FailedJobCommandFactory failedJobCommandFactory;
protected AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler;
protected List<AsyncRunnableExecutionExceptionHandler> asyncRunnableExecutionExceptionHandlers;
protected Map<String, HistoryJobHandler> historyJobHandlers;
protected int asyncExecutorNumberOfRetries;
protected int asyncExecutorResetExpiredJobsMaxTimeout;
protected ObjectMapper objectMapper;
// init
......@@ -120,7 +121,7 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
initDataManagers();
initEntityManagers();
}
@Override
public boolean isHistoryLevelAtLeast(HistoryLevel level) {
if (LOGGER.isDebugEnabled()) {
......@@ -137,7 +138,7 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
}
return historyLevel != HistoryLevel.NONE;
}
// Job manager ///////////////////////////////////////////////////////////
public void initJobManager() {
......@@ -199,7 +200,7 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
public JobServiceConfiguration getIdentityLinkServiceConfiguration() {
return this;
}
public JobService getJobService() {
return jobService;
}
......@@ -208,7 +209,7 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
this.jobService = jobService;
return this;
}
public TimerJobService getTimerJobService() {
return timerJobService;
}
......@@ -217,7 +218,7 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
this.timerJobService = timerJobService;
return this;
}
public HistoryJobService getHistoryJobService() {
return historyJobService;
}
......@@ -234,7 +235,7 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
public void setJobManager(JobManager jobManager) {
this.jobManager = jobManager;
}
public JobDataManager getJobDataManager() {
return jobDataManager;
}
......@@ -331,7 +332,7 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
public JobServiceConfiguration setHistoryJobEntityManager(HistoryJobEntityManager historyJobEntityManager) {
this.historyJobEntityManager = historyJobEntityManager;
return this;
return this;
}
public JobByteArrayEntityManager getJobByteArrayEntityManager() {
......@@ -355,13 +356,13 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
public HistoryLevel getHistoryLevel() {
return historyLevel;
}
@Override
public JobServiceConfiguration setHistoryLevel(HistoryLevel historyLevel) {
this.historyLevel = historyLevel;
return this;
}
public InternalJobManager getInternalJobManager() {
return internalJobManager;
}
......@@ -415,12 +416,12 @@ public class JobServiceConfiguration extends AbstractServiceConfiguration {
return this;
}
public AsyncRunnableExecutionExceptionHandler getAsyncRunnableExecutionExceptionHandler() {
return asyncRunnableExecutionExceptionHandler;
public List<AsyncRunnableExecutionExceptionHandler> getAsyncRunnableExecutionExceptionHandlers() {
return asyncRunnableExecutionExceptionHandlers;
}
public JobServiceConfiguration setAsyncRunnableExecutionExceptionHandler(AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler) {
this.asyncRunnableExecutionExceptionHandler = asyncRunnableExecutionExceptionHandler;
public JobServiceConfiguration setAsyncRunnableExecutionExceptionHandlers(List<AsyncRunnableExecutionExceptionHandler> asyncRunnableExecutionExceptionHandlers) {
this.asyncRunnableExecutionExceptionHandlers = asyncRunnableExecutionExceptionHandlers;
return this;
}
......
package org.flowable.job.service.impl.asyncexecutor;
import org.flowable.engine.common.api.delegate.event.FlowableEngineEventType;
import org.flowable.engine.common.impl.interceptor.Command;
import org.flowable.engine.common.impl.interceptor.CommandConfig;
import org.flowable.engine.common.impl.interceptor.CommandContext;
import org.flowable.job.service.JobInfo;
import org.flowable.job.service.JobServiceConfiguration;
import org.flowable.job.service.event.impl.FlowableJobEventBuilder;
import org.flowable.job.service.impl.persistence.entity.AbstractRuntimeJobEntity;
import org.flowable.job.service.impl.util.CommandContextUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author martin.grofcik
*/
public class DefaultAsyncRunnableExecutionExceptionHandler implements AsyncRunnableExecutionExceptionHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAsyncRunnableExecutionExceptionHandler.class);
@Override
public boolean handleException(final JobServiceConfiguration jobServiceConfiguration, final JobInfo job, final Throwable exception) {
jobServiceConfiguration.getCommandExecutor().execute(new Command<Void>() {
@Override
public Void execute(CommandContext commandContext) {
// Finally, Throw the exception to indicate the ExecuteAsyncJobCmd failed
String message = "Job " + job.getId() + " failed";
LOGGER.error(message, exception);
if (job instanceof AbstractRuntimeJobEntity) {
AbstractRuntimeJobEntity runtimeJob = (AbstractRuntimeJobEntity) job;
if (runtimeJob.getProcessDefinitionId() != null && jobServiceConfiguration.getInternalJobManager() != null &&
jobServiceConfiguration.getInternalJobManager().isFlowable5ProcessDefinitionId(runtimeJob.getProcessDefinitionId())) {
jobServiceConfiguration.getInternalJobManager().handleFailedJob(runtimeJob, exception);
return null;
}
}
CommandConfig commandConfig = jobServiceConfiguration.getCommandExecutor().getDefaultConfig().transactionRequiresNew();
FailedJobCommandFactory failedJobCommandFactory = jobServiceConfiguration.getFailedJobCommandFactory();
Command<Object> cmd = failedJobCommandFactory.getCommand(job.getId(), exception);
LOGGER.trace("Using FailedJobCommandFactory '{}' and command of type '{}'", failedJobCommandFactory.getClass(), cmd.getClass());
jobServiceConfiguration.getCommandExecutor().execute(commandConfig, cmd);
// Dispatch an event, indicating job execution failed in a
// try-catch block, to prevent the original exception to be swallowed
if (CommandContextUtil.getEventDispatcher().isEnabled()) {
try {
CommandContextUtil.getEventDispatcher().dispatchEvent(FlowableJobEventBuilder.createEntityExceptionEvent(FlowableEngineEventType.JOB_EXECUTION_FAILURE, job, exception));
} catch (Throwable ignore) {
LOGGER.warn("Exception occurred while dispatching job failure event, ignoring.", ignore);
}
}
return null;
}
});
return true;
}
}
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
......@@ -12,16 +12,17 @@
*/
package org.flowable.job.service.impl.asyncexecutor;
import java.util.ArrayList;
import java.util.List;
import org.flowable.engine.common.api.FlowableException;
import org.flowable.engine.common.api.FlowableOptimisticLockingException;
import org.flowable.engine.common.api.delegate.event.FlowableEngineEventType;
import org.flowable.engine.common.impl.context.Context;
import org.flowable.engine.common.impl.interceptor.Command;
import org.flowable.engine.common.impl.interceptor.CommandConfig;
import org.flowable.engine.common.impl.interceptor.CommandContext;
import org.flowable.job.service.Job;
import org.flowable.job.service.JobInfo;
import org.flowable.job.service.JobServiceConfiguration;
import org.flowable.job.service.event.impl.FlowableJobEventBuilder;
import org.flowable.job.service.impl.cmd.ExecuteAsyncJobCmd;
import org.flowable.job.service.impl.cmd.LockExclusiveJobCmd;
import org.flowable.job.service.impl.cmd.UnlockExclusiveJobCmd;
......@@ -44,25 +45,41 @@ public class ExecuteAsyncRunnable implements Runnable {
protected JobInfo job;
protected JobServiceConfiguration jobServiceConfiguration;
protected JobInfoEntityManager<? extends JobInfoEntity> jobEntityManager;
protected AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler;
protected List<AsyncRunnableExecutionExceptionHandler> asyncRunnableExecutionExceptionHandlers;
public ExecuteAsyncRunnable(String jobId, JobServiceConfiguration jobServiceConfiguration,
public ExecuteAsyncRunnable(String jobId, JobServiceConfiguration jobServiceConfiguration,
JobInfoEntityManager<? extends JobInfoEntity> jobEntityManager,
AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler) {
this.jobId = jobId;
this.jobServiceConfiguration = jobServiceConfiguration;
this.jobEntityManager = jobEntityManager;
this.asyncRunnableExecutionExceptionHandler = asyncRunnableExecutionExceptionHandler;
initialize(jobId, null, jobServiceConfiguration, jobEntityManager, asyncRunnableExecutionExceptionHandler);
}
public ExecuteAsyncRunnable(JobInfo job, JobServiceConfiguration jobServiceConfiguration,
JobInfoEntityManager<? extends JobInfoEntity> jobEntityManager,
AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler) {
public ExecuteAsyncRunnable(JobInfo job, JobServiceConfiguration jobServiceConfiguration,
JobInfoEntityManager<? extends JobInfoEntity> jobEntityManager,
AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler) {
initialize(job.getId(), job, jobServiceConfiguration, jobEntityManager, asyncRunnableExecutionExceptionHandler);
}
private void initialize(String jobId, JobInfo job, JobServiceConfiguration jobServiceConfiguration, JobInfoEntityManager<? extends JobInfoEntity> jobEntityManager, AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler) {
this.job = job;
this.jobId = job.getId();
this.jobId = jobId;
this.jobServiceConfiguration = jobServiceConfiguration;
this.jobEntityManager = jobEntityManager;
this.asyncRunnableExecutionExceptionHandler = asyncRunnableExecutionExceptionHandler;
this.asyncRunnableExecutionExceptionHandlers = initializeExceptionHandlers(jobServiceConfiguration, asyncRunnableExecutionExceptionHandler);
}
private List<AsyncRunnableExecutionExceptionHandler> initializeExceptionHandlers(JobServiceConfiguration jobServiceConfiguration, AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler) {
List<AsyncRunnableExecutionExceptionHandler> asyncRunnableExecutionExceptionHandlers = new ArrayList<>();
if (asyncRunnableExecutionExceptionHandler != null) {
asyncRunnableExecutionExceptionHandlers.add(asyncRunnableExecutionExceptionHandler);
}
if (jobServiceConfiguration.getAsyncRunnableExecutionExceptionHandlers() != null) {
asyncRunnableExecutionExceptionHandlers.addAll(jobServiceConfiguration.getAsyncRunnableExecutionExceptionHandlers());
}
return asyncRunnableExecutionExceptionHandlers;
}
@Override
......@@ -76,29 +93,29 @@ public class ExecuteAsyncRunnable implements Runnable {
}
});
}
if (job instanceof Job) {
Job jobObject = (Job) job;
if (jobServiceConfiguration.getInternalJobManager() != null &&
if (jobServiceConfiguration.getInternalJobManager() != null &&
jobServiceConfiguration.getInternalJobManager().isFlowable5ProcessDefinitionId(jobObject.getProcessDefinitionId())) {
jobServiceConfiguration.getInternalJobManager().executeV5JobWithLockAndRetry(jobObject);
return;
}
}
if (job instanceof AbstractRuntimeJobEntity) {
boolean lockNotNeededOrSuccess = lockJobIfNeeded();
if (lockNotNeededOrSuccess) {
executeJob();
unlockJobIfNeeded();
}
} else { // history jobs
executeJob();
}
}
......@@ -183,60 +200,14 @@ public class ExecuteAsyncRunnable implements Runnable {
}
protected void handleFailedJob(final Throwable exception) {
AsyncRunnableExecutionExceptionHandler exceptionHandler;
if (asyncRunnableExecutionExceptionHandler != null) {
exceptionHandler = asyncRunnableExecutionExceptionHandler;
} else {
exceptionHandler = jobServiceConfiguration.getAsyncRunnableExecutionExceptionHandler();
}
if (exceptionHandler != null && exceptionHandler.handleException(jobServiceConfiguration, job, exception)) {
return;
}
defaultHandleFailedJob(exception);
}
protected void defaultHandleFailedJob(final Throwable exception) {
jobServiceConfiguration.getCommandExecutor().execute(new Command<Void>() {
@Override
public Void execute(CommandContext commandContext) {
// Finally, Throw the exception to indicate the ExecuteAsyncJobCmd failed
String message = "Job " + jobId + " failed";
LOGGER.error(message, exception);
if (job instanceof AbstractRuntimeJobEntity) {
AbstractRuntimeJobEntity runtimeJob = (AbstractRuntimeJobEntity) job;
if (runtimeJob.getProcessDefinitionId() != null && jobServiceConfiguration.getInternalJobManager() != null &&
jobServiceConfiguration.getInternalJobManager().isFlowable5ProcessDefinitionId(runtimeJob.getProcessDefinitionId())) {
jobServiceConfiguration.getInternalJobManager().handleFailedJob(runtimeJob, exception);
return null;
}
}
CommandConfig commandConfig = jobServiceConfiguration.getCommandExecutor().getDefaultConfig().transactionRequiresNew();
FailedJobCommandFactory failedJobCommandFactory = jobServiceConfiguration.getFailedJobCommandFactory();
Command<Object> cmd = failedJobCommandFactory.getCommand(job.getId(), exception);
LOGGER.trace("Using FailedJobCommandFactory '{}' and command of type '{}'", failedJobCommandFactory.getClass(), cmd.getClass());
jobServiceConfiguration.getCommandExecutor().execute(commandConfig, cmd);
// Dispatch an event, indicating job execution failed in a
// try-catch block, to prevent the original exception to be swallowed
if (CommandContextUtil.getEventDispatcher().isEnabled()) {
try {
CommandContextUtil.getEventDispatcher().dispatchEvent(FlowableJobEventBuilder.createEntityExceptionEvent(FlowableEngineEventType.JOB_EXECUTION_FAILURE, job, exception));
} catch (Throwable ignore) {
LOGGER.warn("Exception occurred while dispatching job failure event, ignoring.", ignore);
}
}
return null;
for (AsyncRunnableExecutionExceptionHandler asyncRunnableExecutionExceptionHandler : asyncRunnableExecutionExceptionHandlers) {
if (asyncRunnableExecutionExceptionHandler.handleException(this.jobServiceConfiguration, this.job, exception)) {
return;
}
});
}
LOGGER.error("Unable to handle exception {} for job {}.", exception, job);
throw new FlowableException("Unable to handle exception " + exception.getMessage() + " for job " + job.getId() + ".", exception);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册