/* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.activiti.engine.impl.persistence.entity; import java.util.ArrayList; import java.util.Calendar; import java.util.GregorianCalendar; import java.util.List; import org.activiti.bpmn.model.MessageEventDefinition; import org.activiti.bpmn.model.Signal; import org.activiti.bpmn.model.SignalEventDefinition; import org.activiti.engine.ActivitiException; import org.activiti.engine.ProcessEngineConfiguration; import org.activiti.engine.impl.EventSubscriptionQueryImpl; import org.activiti.engine.impl.Page; import org.activiti.engine.impl.event.EventHandler; import org.activiti.engine.impl.jobexecutor.ProcessEventJobHandler; import org.activiti.engine.impl.persistence.entity.data.DataManager; import org.activiti.engine.impl.persistence.entity.data.EventSubscriptionDataManager; /** * @author Joram Barrez * @author Tijs Rademakers */ public class EventSubscriptionEntityManagerImpl extends AbstractEntityManager implements EventSubscriptionEntityManager { protected EventSubscriptionDataManager eventSubscriptionDataManager; public EventSubscriptionEntityManagerImpl() { } public EventSubscriptionEntityManagerImpl(EventSubscriptionDataManager eventSubscriptionDataManager) { this.eventSubscriptionDataManager = eventSubscriptionDataManager; } @Override protected DataManager getDataManager() { return eventSubscriptionDataManager; } @Override public CompensateEventSubscriptionEntity createCompensateEventSubscription() { return eventSubscriptionDataManager.createCompensateEventSubscription(); } @Override public MessageEventSubscriptionEntity createMessageEventSubscription() { return eventSubscriptionDataManager.createMessageEventSubscription(); } @Override public SignalEventSubscriptionEntity createSignalEventSubscription() { return eventSubscriptionDataManager.createSignalEventSubscription(); } @Override public SignalEventSubscriptionEntity insertSignalEvent(SignalEventDefinition signalEventDefinition, Signal signal, ExecutionEntity execution) { SignalEventSubscriptionEntity subscriptionEntity = createSignalEventSubscription(); subscriptionEntity.setExecution(execution); if (signal != null) { subscriptionEntity.setEventName(signal.getName()); if (signal.getScope() != null) { subscriptionEntity.setConfiguration(signal.getScope()); } } else { subscriptionEntity.setEventName(signalEventDefinition.getSignalRef()); } subscriptionEntity.setActivityId(execution.getCurrentActivityId()); subscriptionEntity.setProcessDefinitionId(execution.getProcessDefinitionId()); if (execution.getTenantId() != null) { subscriptionEntity.setTenantId(execution.getTenantId()); } insert(subscriptionEntity); execution.getEventSubscriptions().add(subscriptionEntity); return subscriptionEntity; } @Override public MessageEventSubscriptionEntity insertMessageEvent(MessageEventDefinition messageEventDefinition, ExecutionEntity execution) { MessageEventSubscriptionEntity subscriptionEntity = createMessageEventSubscription(); subscriptionEntity.setExecution(execution); subscriptionEntity.setEventName(messageEventDefinition.getMessageRef()); subscriptionEntity.setActivityId(execution.getCurrentActivityId()); subscriptionEntity.setProcessDefinitionId(execution.getProcessDefinitionId()); if (execution.getTenantId() != null) { subscriptionEntity.setTenantId(execution.getTenantId()); } insert(subscriptionEntity); execution.getEventSubscriptions().add(subscriptionEntity); return subscriptionEntity; } @Override public CompensateEventSubscriptionEntity insertCompensationEvent(ExecutionEntity execution, String activityId) { CompensateEventSubscriptionEntity eventSubscription = createCompensateEventSubscription(); eventSubscription.setExecution(execution); eventSubscription.setActivityId(activityId); if (execution.getTenantId() != null) { eventSubscription.setTenantId(execution.getTenantId()); } insert(eventSubscription); return eventSubscription; } @Override public List findCompensateEventSubscriptionsByExecutionId(String executionId) { return findCompensateEventSubscriptionsByExecutionIdAndActivityId(executionId, null); } @Override public List findCompensateEventSubscriptionsByExecutionIdAndActivityId(String executionId, String activityId) { List eventSubscriptions = findEventSubscriptionsByExecutionAndType(executionId, "compensate"); List result = new ArrayList(); for (EventSubscriptionEntity eventSubscriptionEntity : eventSubscriptions) { if (eventSubscriptionEntity instanceof CompensateEventSubscriptionEntity) { if (activityId == null || activityId.equals(eventSubscriptionEntity.getActivityId())) { result.add((CompensateEventSubscriptionEntity) eventSubscriptionEntity); } } } return result; } @Override public List findCompensateEventSubscriptionsByProcessInstanceIdAndActivityId(String processInstanceId, String activityId) { List eventSubscriptions = findEventSubscriptionsByProcessInstanceAndActivityId(processInstanceId, activityId, "compensate"); List result = new ArrayList(); for (EventSubscriptionEntity eventSubscriptionEntity : eventSubscriptions) { result.add((CompensateEventSubscriptionEntity) eventSubscriptionEntity); } return result; } protected void addToExecution(EventSubscriptionEntity eventSubscriptionEntity) { // add reference in execution ExecutionEntity execution = eventSubscriptionEntity.getExecution(); if (execution != null) { execution.getEventSubscriptions().add(eventSubscriptionEntity); } } @Override public long findEventSubscriptionCountByQueryCriteria(EventSubscriptionQueryImpl eventSubscriptionQueryImpl) { return eventSubscriptionDataManager.findEventSubscriptionCountByQueryCriteria(eventSubscriptionQueryImpl); } @Override public List findEventSubscriptionsByQueryCriteria(EventSubscriptionQueryImpl eventSubscriptionQueryImpl, Page page) { return eventSubscriptionDataManager.findEventSubscriptionsByQueryCriteria(eventSubscriptionQueryImpl, page); } @Override public List findMessageEventSubscriptionsByProcessInstanceAndEventName(String processInstanceId, String eventName) { return eventSubscriptionDataManager.findMessageEventSubscriptionsByProcessInstanceAndEventName(processInstanceId, eventName); } @Override public List findSignalEventSubscriptionsByEventName(String eventName, String tenantId) { return eventSubscriptionDataManager.findSignalEventSubscriptionsByEventName(eventName, tenantId); } @Override public List findSignalEventSubscriptionsByProcessInstanceAndEventName(String processInstanceId, String eventName) { return eventSubscriptionDataManager.findSignalEventSubscriptionsByProcessInstanceAndEventName(processInstanceId, eventName); } @Override public List findSignalEventSubscriptionsByExecution(String executionId) { return eventSubscriptionDataManager.findSignalEventSubscriptionsByExecution(executionId); } @Override public List findSignalEventSubscriptionsByNameAndExecution(String name, String executionId) { return eventSubscriptionDataManager.findSignalEventSubscriptionsByNameAndExecution(name, executionId); } @Override public List findEventSubscriptionsByExecutionAndType(final String executionId, final String type) { return eventSubscriptionDataManager.findEventSubscriptionsByExecutionAndType(executionId, type); } @Override public List findEventSubscriptionsByProcessInstanceAndActivityId(String processInstanceId, String activityId, String type) { return eventSubscriptionDataManager.findEventSubscriptionsByProcessInstanceAndActivityId(processInstanceId, activityId, type); } @Override public List findEventSubscriptionsByExecution(final String executionId) { return eventSubscriptionDataManager.findEventSubscriptionsByExecution(executionId); } @Override public List findEventSubscriptionsByConfiguration(String type, String configuration, String tenantId) { return eventSubscriptionDataManager.findEventSubscriptionsByConfiguration(type, configuration, tenantId); } @Override public List findEventSubscriptionsByName(String type, String eventName, String tenantId) { return eventSubscriptionDataManager.findEventSubscriptionsByName(type, eventName, tenantId); } @Override public List findEventSubscriptionsByNameAndExecution(String type, String eventName, String executionId) { return eventSubscriptionDataManager.findEventSubscriptionsByNameAndExecution(type, eventName, executionId); } @Override public MessageEventSubscriptionEntity findMessageStartEventSubscriptionByName(String messageName, String tenantId) { return eventSubscriptionDataManager.findMessageStartEventSubscriptionByName(messageName, tenantId); } @Override public void updateEventSubscriptionTenantId(String oldTenantId, String newTenantId) { eventSubscriptionDataManager.updateEventSubscriptionTenantId(oldTenantId, newTenantId); } @Override public void deleteEventSubscriptionsForProcessDefinition(String processDefinitionId) { eventSubscriptionDataManager.deleteEventSubscriptionsForProcessDefinition(processDefinitionId); } // Processing ///////////////////////////////////////////////////////////// @Override public void eventReceived(EventSubscriptionEntity eventSubscriptionEntity, Object payload, boolean processASync) { if (processASync) { scheduleEventAsync(eventSubscriptionEntity, payload); } else { processEventSync(eventSubscriptionEntity, payload); } } protected void processEventSync(EventSubscriptionEntity eventSubscriptionEntity, Object payload) { // A compensate event needs to be deleted before the handlers are called if (eventSubscriptionEntity instanceof CompensateEventSubscriptionEntity) { delete(eventSubscriptionEntity); } EventHandler eventHandler = getProcessEngineConfiguration().getEventHandler(eventSubscriptionEntity.getEventType()); if (eventHandler == null) { throw new ActivitiException("Could not find eventhandler for event of type '" + eventSubscriptionEntity.getEventType() + "'."); } eventHandler.handleEvent(eventSubscriptionEntity, payload, getCommandContext()); } protected void scheduleEventAsync(EventSubscriptionEntity eventSubscriptionEntity, Object payload) { MessageEntity message = getJobEntityManager().createMessage(); message.setJobHandlerType(ProcessEventJobHandler.TYPE); message.setJobHandlerConfiguration(eventSubscriptionEntity.getId()); message.setTenantId(eventSubscriptionEntity.getTenantId()); GregorianCalendar expireCal = new GregorianCalendar(); ProcessEngineConfiguration processEngineConfig = getProcessEngineConfiguration(); expireCal.setTime(processEngineConfig.getClock().getCurrentTime()); expireCal.add(Calendar.SECOND, processEngineConfig.getLockTimeAsyncJobWaitTime()); message.setLockExpirationTime(expireCal.getTime()); // TODO: support payload // if(payload != null) { // message.setEventPayload(payload); // } getJobEntityManager().send(message); } protected List toSignalEventSubscriptionEntityList(List result) { List signalEventSubscriptionEntities = new ArrayList(result.size()); for (EventSubscriptionEntity eventSubscriptionEntity : result ) { signalEventSubscriptionEntities.add((SignalEventSubscriptionEntity) eventSubscriptionEntity); } return signalEventSubscriptionEntities; } protected List toMessageEventSubscriptionEntityList(List result) { List messageEventSubscriptionEntities = new ArrayList(result.size()); for (EventSubscriptionEntity eventSubscriptionEntity : result ) { messageEventSubscriptionEntities.add((MessageEventSubscriptionEntity) eventSubscriptionEntity); } return messageEventSubscriptionEntities; } public EventSubscriptionDataManager getEventSubscriptionDataManager() { return eventSubscriptionDataManager; } public void setEventSubscriptionDataManager(EventSubscriptionDataManager eventSubscriptionDataManager) { this.eventSubscriptionDataManager = eventSubscriptionDataManager; } }