提交 d9c2f610 编写于 作者: J Joram Barrez

Merge branch 'master' of github.com:Activiti/Activiti

......@@ -75,6 +75,7 @@ public abstract class CamelBehavior extends BpmnActivityBehavior implements Acti
ActivitiEndpoint endpoint = createEndpoint(execution);
Exchange exchange = createExchange(execution, endpoint);
endpoint.process(exchange);
handleCamelException(exchange);
execution.setVariables(ExchangeUtils.prepareVariables(exchange, endpoint));
performDefaultOutgoingBehavior(execution);
}
......@@ -101,6 +102,14 @@ public abstract class CamelBehavior extends BpmnActivityBehavior implements Acti
return ex;
}
protected void handleCamelException(Exchange exchange) {
Exception camelException = exchange.getException();
boolean notHandledByCamel = exchange.isFailed() && camelException != null;
if (notHandledByCamel) {
throw new ActivitiException("Unhandled exception on camel route", camelException);
}
}
protected void copyVariablesToProperties(Map<String, Object> variables, Exchange exchange) {
for (Map.Entry<String, Object> var : variables.entrySet()) {
exchange.setProperty(var.getKey(), var.getValue());
......
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activiti.camel;
import java.util.HashMap;
import java.util.Map;
import org.activiti.camel.util.Routing;
import org.activiti.engine.runtime.Job;
import org.activiti.engine.runtime.ProcessInstance;
import org.activiti.engine.test.Deployment;
import org.activiti.spring.impl.test.SpringActivitiTestCase;
import org.springframework.test.context.ContextConfiguration;
/**
* Demonstrates an issue in Activiti 5.12. Exception on Camel routes will be lost,
* if default error handling is used.
*
* @author stefan.schulze@accelsis.biz
*
*/
@ContextConfiguration("classpath:error-camel-activiti-context.xml")
public class ErrorHandlingTest extends SpringActivitiTestCase {
private static final int WAIT = 3000;
private static final String PREVIOUS_WAIT_STATE = "LogProcessStart";
private static final String NEXT_WAIT_STATE = "ReceiveResult";
/**
* Process instance should be removed after completion. Works as intended,
* if no exception interrupts the Camel route.
*
* @throws Exception
*/
@Deployment(resources = {"process/errorHandling.bpmn20.xml"})
public void testCamelRouteWorksAsIntended() throws Exception {
Map<String, Object> variables = new HashMap<String, Object>();
variables.put("routing", Routing.DEFAULT);
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(
"ErrorHandling", variables);
Job job = managementService.createJobQuery().processInstanceId(processInstance.getId()).singleResult();
assertNotNull(job);
managementService.executeJob(job.getId());
Thread.sleep(WAIT);
assertEquals("Process instance not completed", 0, runtimeService.createProcessInstanceQuery()
.processInstanceId(processInstance.getId())
.count());
}
/**
* Expected behavior, with default error handling in Camel:
* Roll-back to previous wait state. Fails with Activiti 5.12.
*
* @throws Exception
*/
@Deployment(resources = {"process/errorHandling.bpmn20.xml"})
public void testRollbackOnException() throws Exception {
Map<String, Object> variables = new HashMap<String, Object>();
variables.put("routing", Routing.PROVOKE_ERROR);
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(
"ErrorHandling", variables);
assertEquals("No roll-back to previous wait state", 1, runtimeService.createExecutionQuery()
.processInstanceId(processInstance.getId())
.activityId(PREVIOUS_WAIT_STATE)
.count());
assertEquals("Process instance advanced to next wait state", 0, runtimeService.createExecutionQuery()
.processInstanceId(processInstance.getId())
.activityId(NEXT_WAIT_STATE)
.count());
}
/**
* Exception caught and processed by Camel dead letter queue handler.
* Process instance proceeds to ReceiveTask as expected.
*
* @throws Exception
*/
@Deployment(resources = {"process/errorHandling.bpmn20.xml"})
public void testErrorHandledByCamel() throws Exception {
Map<String, Object> variables = new HashMap<String, Object>();
variables.put("routing", Routing.HANDLE_ERROR);
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(
"ErrorHandling", variables);
Job job = managementService.createJobQuery().processInstanceId(processInstance.getId()).singleResult();
assertNotNull(job);
managementService.executeJob(job.getId());
Thread.sleep(WAIT);
assertEquals("Process instance did not reach next wait state", 1, runtimeService.createExecutionQuery()
.processInstanceId(processInstance.getId())
.activityId(NEXT_WAIT_STATE)
.count());
}
}
package org.activiti.camel.route;
import org.activiti.camel.util.TimeConsumingService;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
/**
* @author stefan.schulze@accelsis.biz
*
*/
public class InboundErrorRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("seda:inbound").routeId("inbound")
// give Activiti some time to reach synchronization point
.bean(TimeConsumingService.class)
.log(LoggingLevel.INFO, "Returning result ...")
.to("activiti:ErrorHandling:ReceiveResult");
from("seda:dlq").routeId("dlq").log(LoggingLevel.INFO,
"Error handled by camel ...");
}
}
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activiti.camel.route;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
/**
* Creates routes with different behaviors.
*
* @author stefan.schulze@accelsis.biz
*
*/
public class OutboundErrorRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
// no problems
from("activiti:ErrorHandling:NormalExecution").routeId("outbound")
.log(LoggingLevel.INFO, "Normal execution")
.to("seda:inbound");
// always fails with default error handling: propagates exception back to the caller
from("activiti:ErrorHandling:ProvokeError").routeId("error")
.log(LoggingLevel.INFO, "Provoked error")
.beanRef("brokenService") // <-- throws Exception
.to("seda:inbound");
// always fails with specific error handler: exception is not propagated back
from("activiti:ErrorHandling:HandleError").routeId("errorWithDlq")
.errorHandler(deadLetterChannel("seda:dlq"))
.log(LoggingLevel.INFO, "Provoked error")
.beanRef("brokenService") // <-- throws Exception
.to("seda:inbound");
}
}
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activiti.camel.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Use with Camel's bean component.
*
* @author stefan.schulze@accelsis.biz
*
*/
public class BrokenService {
private static final Logger LOG = LoggerFactory.getLogger(BrokenService.class);
/**
* Always throws an exception.
*
* the current Camel message
* @throws BrokenServiceException
*/
public void alwaysFails() throws BrokenServiceException {
LOG.info("{} called", this.getClass().getSimpleName());
throw new BrokenServiceException("Provoked failure");
}
}
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activiti.camel.util;
/**
* Thrown by BrokenService
* @author stefan.schulze@accelsis.biz
*
*/
public class BrokenServiceException extends Exception {
private static final long serialVersionUID = 1L;
public BrokenServiceException(String message) {
super(message);
}
}
package org.activiti.camel.util;
import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.delegate.ExecutionListener;
/**
* Attach to the ReceiveTask (end event).
*
* @author stefan.schulze@accelsis.biz
*
*/
public class DummyExecutionListener implements ExecutionListener {
private static final long serialVersionUID = 1L;
@Override
public void notify(DelegateExecution execution) throws Exception {
// dummy
}
}
package org.activiti.camel.util;
import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.delegate.JavaDelegate;
/**
* ServiceTask to start with.
*
* @author stefan.schulze@accelsis.biz
*
*/
public class DummyJavaDelegate implements JavaDelegate {
@Override
public void execute(DelegateExecution execution) throws Exception {
// dummy
}
}
package org.activiti.camel;
package org.activiti.camel.util;
import org.activiti.camel.ActivitiProducer;
import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.delegate.JavaDelegate;
......
package org.activiti.camel.util;
/**
* Used in the gateway's condition.
*
* @author stefan.schulze@accelsis.biz
*
*/
public enum Routing {
/**
* Process should terminate normally.
*/
DEFAULT,
/**
* Camel route should throw an exception and propagate it back to the caller.
*/
PROVOKE_ERROR,
/**
* Camel route should throw an exception and handle it.
*/
HANDLE_ERROR;
}
package org.activiti.camel;
package org.activiti.camel.util;
import org.activiti.engine.delegate.DelegateExecution;
import org.activiti.engine.delegate.JavaDelegate;
......
package org.activiti.camel.util;
/**
* Simulates some real work, to delay the message on the Camel route.
*
* @author stefan.schulze@accelsis.biz
*
*/
public class TimeConsumingService {
/**
* Spend some time.
*
* @throws InterruptedException
*/
public void doWork() throws InterruptedException {
Thread.sleep(100);
}
}
......@@ -35,13 +35,13 @@
</bean>
<bean id="runtimeService" factory-bean="processEngine" factory-method="getRuntimeService"/>
<camelContext id="camelContext" xmlns="http://camel.apache.org/schema/spring">
<packageScan>
<package>org.activiti.camel.route</package>
</packageScan>
</camelContext>
<bean id="sleepBean" class="org.activiti.camel.SleepBean"/>
<bean id="sleepBean" class="org.activiti.camel.util.SleepBean"/>
</beans>
......@@ -42,6 +42,6 @@
</packageScan>
</camelContext>
<bean id="sleepBean" class="org.activiti.camel.SleepBean"/>
<bean id="sleepBean" class="org.activiti.camel.util.SleepBean"/>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:osgi="http://www.springframework.org/schema/osgi"
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="dataSource" class="org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy">
<property name="targetDataSource">
<bean class="org.springframework.jdbc.datasource.SimpleDriverDataSource">
<property name="driverClass" value="org.h2.Driver"/>
<property name="url" value="jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000"/>
<property name="username" value="sa"/>
<property name="password" value=""/>
</bean>
</property>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<bean id="processEngineConfiguration" class="org.activiti.spring.SpringProcessEngineConfiguration">
<property name="dataSource" ref="dataSource"/>
<property name="transactionManager" ref="transactionManager"/>
<property name="databaseSchemaUpdate" value="true"/>
<property name="jobExecutorActivate" value="false"/>
</bean>
<bean id="processEngine" class="org.activiti.spring.ProcessEngineFactoryBean">
<property name="processEngineConfiguration" ref="processEngineConfiguration"/>
</bean>
<bean id="runtimeService" factory-bean="processEngine" factory-method="getRuntimeService"/>
<bean id="outbound" class="org.activiti.camel.route.OutboundErrorRoute"/>
<bean id="inbound" class="org.activiti.camel.route.InboundErrorRoute"/>
<bean id="brokenService" class="org.activiti.camel.util.BrokenService"/>
<bean id="dummyDelegate" class="org.activiti.camel.util.DummyJavaDelegate"/>
<bean id="dummyListener" class="org.activiti.camel.util.DummyExecutionListener"/>
<camelContext id="camelContext" xmlns="http://camel.apache.org/schema/spring">
<routeBuilder ref="outbound"/>
<routeBuilder ref="inbound"/>
</camelContext>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<bpmn2:definitions
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:activiti="http://activiti.org/bpmn"
xmlns:bpmn2="http://www.omg.org/spec/BPMN/20100524/MODEL"
xsi:schemaLocation="http://www.omg.org/spec/BPMN/20100524/MODEL BPMN20.xsd"
targetNamespace="http://activiti.org/bpmn">
<bpmn2:process id="ErrorHandling" name="Camel/Activiti error handling demonstration" isExecutable="true">
<bpmn2:startEvent id="StartEvent_1" name="Start Event"/>
<bpmn2:sequenceFlow id="SequenceFlow_1" sourceRef="StartEvent_1" targetRef="LogProcessStart"/>
<bpmn2:serviceTask id="LogProcessStart" activiti:async="true" activiti:delegateExpression="${dummyDelegate}" name="Log Process Start"/>
<bpmn2:sequenceFlow id="SequenceFlow_2" sourceRef="LogProcessStart" targetRef="ExclusiveGateway_1"/>
<bpmn2:exclusiveGateway id="ExclusiveGateway_1" name="Exclusive Gateway" default="SequenceFlow_3"/>
<bpmn2:sequenceFlow id="SequenceFlow_3" sourceRef="ExclusiveGateway_1" targetRef="NormalExecution"/>
<bpmn2:serviceTask id="NormalExecution" name="Normal Execution" activiti:type="camel"/>
<bpmn2:sequenceFlow id="SequenceFlow_5" name="" sourceRef="NormalExecution" targetRef="ExclusiveGateway_2"/>
<bpmn2:sequenceFlow id="SequenceFlow_7" name="" sourceRef="ExclusiveGateway_1" targetRef="ProvokeError">
<bpmn2:conditionExpression xsi:type="bpmn2:tFormalExpression">${routing == 'PROVOKE_ERROR'}</bpmn2:conditionExpression>
</bpmn2:sequenceFlow>
<bpmn2:serviceTask id="ProvokeError" name="Provoke Error" activiti:type="camel"/>
<bpmn2:sequenceFlow id="SequenceFlow_8" name="" sourceRef="ProvokeError" targetRef="ExclusiveGateway_2"/>
<bpmn2:sequenceFlow id="SequenceFlow_9" sourceRef="ExclusiveGateway_1" targetRef="HandleError">
<bpmn2:conditionExpression xsi:type="bpmn2:tFormalExpression">${routing == 'HANDLE_ERROR'}</bpmn2:conditionExpression>
</bpmn2:sequenceFlow>
<bpmn2:serviceTask id="HandleError" name="Handle Error" activiti:type="camel"/>
<bpmn2:sequenceFlow id="SequenceFlow_10" name="" sourceRef="HandleError" targetRef="ExclusiveGateway_2"/>
<bpmn2:exclusiveGateway id="ExclusiveGateway_2" name="Exclusive Gateway"/>
<bpmn2:sequenceFlow id="SequenceFlow_4" name="" sourceRef="ExclusiveGateway_2" targetRef="ReceiveResult"/>
<bpmn2:receiveTask id="ReceiveResult" name="Receive Result">
<bpmn2:extensionElements>
<activiti:executionListener delegateExpression="${dummyListener}" event="end"/>
</bpmn2:extensionElements>
</bpmn2:receiveTask>
<bpmn2:sequenceFlow id="SequenceFlow_6" sourceRef="ReceiveResult" targetRef="EndEvent_1"/>
<bpmn2:endEvent id="EndEvent_1" name="End Event"/>
</bpmn2:process>
</bpmn2:definitions>
\ No newline at end of file
......@@ -64,6 +64,6 @@
</route>
</camelContext>
<bean id="sleepBean" class="org.activiti.camel.SleepBean"/>
<bean id="sleepBean" class="org.activiti.camel.util.SleepBean"/>
</beans>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册