EventSubscriptionEntityManagerImpl.java 13.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
/* Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.activiti.engine.impl.persistence.entity;

import java.util.ArrayList;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List;

import org.activiti.bpmn.model.MessageEventDefinition;
import org.activiti.bpmn.model.Signal;
import org.activiti.bpmn.model.SignalEventDefinition;
import org.activiti.engine.ActivitiException;
import org.activiti.engine.ProcessEngineConfiguration;
import org.activiti.engine.impl.EventSubscriptionQueryImpl;
import org.activiti.engine.impl.Page;
import org.activiti.engine.impl.event.EventHandler;
import org.activiti.engine.impl.jobexecutor.ProcessEventJobHandler;
30 31
import org.activiti.engine.impl.persistence.entity.data.DataManager;
import org.activiti.engine.impl.persistence.entity.data.EventSubscriptionDataManager;
32 33 34 35 36 37

/**
 * @author Joram Barrez
 * @author Tijs Rademakers
 */
public class EventSubscriptionEntityManagerImpl extends AbstractEntityManager<EventSubscriptionEntity> implements EventSubscriptionEntityManager {
38
  
39
  protected EventSubscriptionDataManager eventSubscriptionDataManager;
40
  
41 42
  public EventSubscriptionEntityManagerImpl() {
    
43 44
  }
  
45 46
  public EventSubscriptionEntityManagerImpl(EventSubscriptionDataManager eventSubscriptionDataManager) {
    this.eventSubscriptionDataManager = eventSubscriptionDataManager;
47
  }
48
  
49
  @Override
50 51
  protected DataManager<EventSubscriptionEntity> getDataManager() {
    return eventSubscriptionDataManager;
52
  }
53
  
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
  @Override
  public CompensateEventSubscriptionEntity createCompensateEventSubscription() {
    return eventSubscriptionDataManager.createCompensateEventSubscription();
  }
  
  @Override
  public MessageEventSubscriptionEntity createMessageEventSubscription() {
    return eventSubscriptionDataManager.createMessageEventSubscription();
  }
  
  @Override
  public SignalEventSubscriptionEntity createSignalEventSubscription() {
    return eventSubscriptionDataManager.createSignalEventSubscription();
  }
  
69 70
  @Override
  public SignalEventSubscriptionEntity insertSignalEvent(SignalEventDefinition signalEventDefinition, Signal signal, ExecutionEntity execution) {
71
    SignalEventSubscriptionEntity subscriptionEntity = createSignalEventSubscription();
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
    subscriptionEntity.setExecution(execution);
    if (signal != null) {
      subscriptionEntity.setEventName(signal.getName());
      if (signal.getScope() != null) {
        subscriptionEntity.setConfiguration(signal.getScope());
      }
    } else {
      subscriptionEntity.setEventName(signalEventDefinition.getSignalRef());
    }

    subscriptionEntity.setActivityId(execution.getCurrentActivityId());
    subscriptionEntity.setProcessDefinitionId(execution.getProcessDefinitionId());
    if (execution.getTenantId() != null) {
      subscriptionEntity.setTenantId(execution.getTenantId());
    }
    insert(subscriptionEntity);
    execution.getEventSubscriptions().add(subscriptionEntity);
    return subscriptionEntity;
  }

  @Override
  public MessageEventSubscriptionEntity insertMessageEvent(MessageEventDefinition messageEventDefinition, ExecutionEntity execution) {
94
    MessageEventSubscriptionEntity subscriptionEntity = createMessageEventSubscription();
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
    subscriptionEntity.setExecution(execution);
    subscriptionEntity.setEventName(messageEventDefinition.getMessageRef());

    subscriptionEntity.setActivityId(execution.getCurrentActivityId());
    subscriptionEntity.setProcessDefinitionId(execution.getProcessDefinitionId());
    if (execution.getTenantId() != null) {
      subscriptionEntity.setTenantId(execution.getTenantId());
    }
    insert(subscriptionEntity);
    execution.getEventSubscriptions().add(subscriptionEntity);
    return subscriptionEntity;
  }
  
  @Override
  public CompensateEventSubscriptionEntity insertCompensationEvent(ExecutionEntity execution, String activityId) {
110
    CompensateEventSubscriptionEntity eventSubscription = createCompensateEventSubscription();
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
    eventSubscription.setExecution(execution);
    eventSubscription.setActivityId(activityId);
    if (execution.getTenantId() != null) {
      eventSubscription.setTenantId(execution.getTenantId());
    }
    insert(eventSubscription);
    return eventSubscription;
  }
  
  @Override
  public List<CompensateEventSubscriptionEntity> findCompensateEventSubscriptionsByExecutionId(String executionId) {
    return findCompensateEventSubscriptionsByExecutionIdAndActivityId(executionId, null);
  }

  @Override
  public List<CompensateEventSubscriptionEntity> findCompensateEventSubscriptionsByExecutionIdAndActivityId(String executionId, String activityId) {
    List<EventSubscriptionEntity> eventSubscriptions = findEventSubscriptionsByExecutionAndType(executionId, "compensate");
    List<CompensateEventSubscriptionEntity> result = new ArrayList<CompensateEventSubscriptionEntity>();
    for (EventSubscriptionEntity eventSubscriptionEntity : eventSubscriptions) {
      if (eventSubscriptionEntity instanceof CompensateEventSubscriptionEntity) {
        if (activityId == null || activityId.equals(eventSubscriptionEntity.getActivityId())) {
          result.add((CompensateEventSubscriptionEntity) eventSubscriptionEntity);
        }
      }
    }
    return result;
  }
  
  @Override
  public List<CompensateEventSubscriptionEntity> findCompensateEventSubscriptionsByProcessInstanceIdAndActivityId(String processInstanceId, String activityId) {
    List<EventSubscriptionEntity> eventSubscriptions = findEventSubscriptionsByProcessInstanceAndActivityId(processInstanceId, activityId, "compensate");
    List<CompensateEventSubscriptionEntity> result = new ArrayList<CompensateEventSubscriptionEntity>();
    for (EventSubscriptionEntity eventSubscriptionEntity : eventSubscriptions) {
      result.add((CompensateEventSubscriptionEntity) eventSubscriptionEntity);
    }
    return result;
  }
  
  protected void addToExecution(EventSubscriptionEntity eventSubscriptionEntity) {
    // add reference in execution
151
    ExecutionEntity execution = eventSubscriptionEntity.getExecution();
152 153 154 155 156 157 158
    if (execution != null) {
      execution.getEventSubscriptions().add(eventSubscriptionEntity);
    }
  }
  
  @Override
  public long findEventSubscriptionCountByQueryCriteria(EventSubscriptionQueryImpl eventSubscriptionQueryImpl) {
159
    return eventSubscriptionDataManager.findEventSubscriptionCountByQueryCriteria(eventSubscriptionQueryImpl);
160 161 162 163
  }

  @Override
  public List<EventSubscriptionEntity> findEventSubscriptionsByQueryCriteria(EventSubscriptionQueryImpl eventSubscriptionQueryImpl, Page page) {
164
    return eventSubscriptionDataManager.findEventSubscriptionsByQueryCriteria(eventSubscriptionQueryImpl, page);
165 166 167
  }
  
  @Override
168 169
  public List<MessageEventSubscriptionEntity> findMessageEventSubscriptionsByProcessInstanceAndEventName(String processInstanceId, String eventName) {
    return eventSubscriptionDataManager.findMessageEventSubscriptionsByProcessInstanceAndEventName(processInstanceId, eventName);
170 171 172
  }

  @Override
173 174
  public List<SignalEventSubscriptionEntity> findSignalEventSubscriptionsByEventName(String eventName, String tenantId) {
    return eventSubscriptionDataManager.findSignalEventSubscriptionsByEventName(eventName, tenantId);
175 176 177
  }

  @Override
178 179
  public List<SignalEventSubscriptionEntity> findSignalEventSubscriptionsByProcessInstanceAndEventName(String processInstanceId, String eventName) {
    return eventSubscriptionDataManager.findSignalEventSubscriptionsByProcessInstanceAndEventName(processInstanceId, eventName);
180 181 182
  }

  @Override
183 184
  public List<SignalEventSubscriptionEntity> findSignalEventSubscriptionsByExecution(String executionId) {
    return eventSubscriptionDataManager.findSignalEventSubscriptionsByExecution(executionId);
185 186 187
  }

  @Override
188 189
  public List<SignalEventSubscriptionEntity> findSignalEventSubscriptionsByNameAndExecution(String name, String executionId) {
    return eventSubscriptionDataManager.findSignalEventSubscriptionsByNameAndExecution(name, executionId);
190 191 192
  }

  @Override
193
  public List<EventSubscriptionEntity> findEventSubscriptionsByExecutionAndType(final String executionId, final String type) {
194
    return eventSubscriptionDataManager.findEventSubscriptionsByExecutionAndType(executionId, type);
195 196 197
  }
  
  @Override
198 199
  public List<EventSubscriptionEntity> findEventSubscriptionsByProcessInstanceAndActivityId(String processInstanceId, String activityId, String type) {
    return eventSubscriptionDataManager.findEventSubscriptionsByProcessInstanceAndActivityId(processInstanceId, activityId, type);
200 201 202
  }

  @Override
203
  public List<EventSubscriptionEntity> findEventSubscriptionsByExecution(final String executionId) {
204
    return eventSubscriptionDataManager.findEventSubscriptionsByExecution(executionId);
205 206 207 208
  }

  @Override
  public List<EventSubscriptionEntity> findEventSubscriptionsByConfiguration(String type, String configuration, String tenantId) {
209
    return eventSubscriptionDataManager.findEventSubscriptionsByConfiguration(type, configuration, tenantId);
210 211 212
  }

  @Override
213 214
  public List<EventSubscriptionEntity> findEventSubscriptionsByName(String type, String eventName, String tenantId) {
    return eventSubscriptionDataManager.findEventSubscriptionsByName(type, eventName, tenantId);
215 216 217 218
  }

  @Override
  public List<EventSubscriptionEntity> findEventSubscriptionsByNameAndExecution(String type, String eventName, String executionId) {
219
    return eventSubscriptionDataManager.findEventSubscriptionsByNameAndExecution(type, eventName, executionId);
220 221 222 223
  }

  @Override
  public MessageEventSubscriptionEntity findMessageStartEventSubscriptionByName(String messageName, String tenantId) {
224
    return eventSubscriptionDataManager.findMessageStartEventSubscriptionByName(messageName, tenantId);
225 226 227 228
  }

  @Override
  public void updateEventSubscriptionTenantId(String oldTenantId, String newTenantId) {
229
    eventSubscriptionDataManager.updateEventSubscriptionTenantId(oldTenantId, newTenantId);
230 231
  }
  
232 233 234 235
  @Override
  public void deleteEventSubscriptionsForProcessDefinition(String processDefinitionId) {
    eventSubscriptionDataManager.deleteEventSubscriptionsForProcessDefinition(processDefinitionId);
  }
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
  
  // Processing /////////////////////////////////////////////////////////////
  
  @Override
  public void eventReceived(EventSubscriptionEntity eventSubscriptionEntity, Object payload, boolean processASync) {
    if (processASync) {
      scheduleEventAsync(eventSubscriptionEntity, payload);
    } else {
      processEventSync(eventSubscriptionEntity, payload);
    }
  }

  protected void processEventSync(EventSubscriptionEntity eventSubscriptionEntity, Object payload) {
    
    // A compensate event needs to be deleted before the handlers are called
    if (eventSubscriptionEntity instanceof CompensateEventSubscriptionEntity) {
      delete(eventSubscriptionEntity);
    }
    
255
    EventHandler eventHandler = getProcessEngineConfiguration().getEventHandler(eventSubscriptionEntity.getEventType());
256 257 258
    if (eventHandler == null) {
      throw new ActivitiException("Could not find eventhandler for event of type '" + eventSubscriptionEntity.getEventType() + "'.");
    }
259
    eventHandler.handleEvent(eventSubscriptionEntity, payload, getCommandContext());
260 261 262 263
  }

  protected void scheduleEventAsync(EventSubscriptionEntity eventSubscriptionEntity, Object payload) {

264
    MessageEntity message = getJobEntityManager().createMessage();
265 266 267 268 269
    message.setJobHandlerType(ProcessEventJobHandler.TYPE);
    message.setJobHandlerConfiguration(eventSubscriptionEntity.getId());
    message.setTenantId(eventSubscriptionEntity.getTenantId());

    GregorianCalendar expireCal = new GregorianCalendar();
270
    ProcessEngineConfiguration processEngineConfig = getProcessEngineConfiguration();
271 272 273 274 275 276 277 278 279
    expireCal.setTime(processEngineConfig.getClock().getCurrentTime());
    expireCal.add(Calendar.SECOND, processEngineConfig.getLockTimeAsyncJobWaitTime());
    message.setLockExpirationTime(expireCal.getTime());

    // TODO: support payload
    // if(payload != null) {
    // message.setEventPayload(payload);
    // }

280
    getJobEntityManager().send(message);
281 282
  }
  
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
  protected List<SignalEventSubscriptionEntity> toSignalEventSubscriptionEntityList(List<EventSubscriptionEntity> result) {
    List<SignalEventSubscriptionEntity> signalEventSubscriptionEntities = new ArrayList<SignalEventSubscriptionEntity>(result.size());
    for (EventSubscriptionEntity eventSubscriptionEntity : result ) {
      signalEventSubscriptionEntities.add((SignalEventSubscriptionEntity) eventSubscriptionEntity);
    }
    return signalEventSubscriptionEntities;
  }
  
  protected List<MessageEventSubscriptionEntity> toMessageEventSubscriptionEntityList(List<EventSubscriptionEntity> result) {
    List<MessageEventSubscriptionEntity> messageEventSubscriptionEntities = new ArrayList<MessageEventSubscriptionEntity>(result.size());
    for (EventSubscriptionEntity eventSubscriptionEntity : result ) {
      messageEventSubscriptionEntities.add((MessageEventSubscriptionEntity) eventSubscriptionEntity);
    }
    return messageEventSubscriptionEntities;
  }
298 299 300 301 302 303 304 305

  public EventSubscriptionDataManager getEventSubscriptionDataManager() {
    return eventSubscriptionDataManager;
  }

  public void setEventSubscriptionDataManager(EventSubscriptionDataManager eventSubscriptionDataManager) {
    this.eventSubscriptionDataManager = eventSubscriptionDataManager;
  }
306
  
307 308

}