ParallelProcessor.java 5.4 KB
Newer Older
R
roo00 已提交
1 2 3 4 5
package com.x.processplatform.service.processing.processor.parallel;

import java.util.ArrayList;
import java.util.List;

R
Ray 已提交
6
import javax.script.CompiledScript;
R
roo00 已提交
7 8
import javax.script.ScriptContext;

R
roo00 已提交
9
import org.apache.commons.lang3.BooleanUtils;
R
roo00 已提交
10 11 12 13 14
import org.apache.commons.lang3.StringUtils;

import com.x.base.core.container.EntityManagerContainer;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
R
Ray 已提交
15 16
import com.x.base.core.project.scripting.JsonScriptingExecutor;
import com.x.base.core.project.scripting.ScriptingFactory;
R
roo00 已提交
17 18 19 20 21
import com.x.base.core.project.tools.StringTools;
import com.x.processplatform.core.entity.content.Work;
import com.x.processplatform.core.entity.content.WorkLog;
import com.x.processplatform.core.entity.element.Parallel;
import com.x.processplatform.core.entity.element.Route;
Z
zhourui 已提交
22
import com.x.processplatform.core.entity.log.Signal;
R
roo00 已提交
23
import com.x.processplatform.service.processing.Business;
R
roo00 已提交
24 25 26 27 28 29 30 31 32 33 34 35
import com.x.processplatform.service.processing.processor.AeiObjects;

public class ParallelProcessor extends AbstractParallelProcessor {

	private static Logger logger = LoggerFactory.getLogger(ParallelProcessor.class);

	public ParallelProcessor(EntityManagerContainer entityManagerContainer) throws Exception {
		super(entityManagerContainer);
	}

	@Override
	protected Work arriving(AeiObjects aeiObjects, Parallel parallel) throws Exception {
Z
zhourui 已提交
36
		// 发送ProcessingSignal
Z
zhourui 已提交
37 38
		aeiObjects.getProcessingAttributes()
				.push(Signal.parallelArrive(aeiObjects.getWork().getActivityToken(), parallel));
R
roo00 已提交
39 40 41 42 43 44 45 46 47 48
		logger.info(
				"parallel arrvie processing, work title:{}, id:{}, actvity name:{}, id:{}, activityToken:{}, process name:{}, id{}.",
				aeiObjects.getWork().getTitle(), aeiObjects.getWork().getId(), parallel.getName(), parallel.getId(),
				aeiObjects.getWork().getActivityToken(), aeiObjects.getWork().getProcessName(),
				aeiObjects.getWork().getProcess());
		return aeiObjects.getWork();
	}

	@Override
	protected void arrivingCommitted(AeiObjects aeiObjects, Parallel parallel) throws Exception {
Z
zhourui 已提交
49
		// nothing
R
roo00 已提交
50 51 52 53
	}

	@Override
	protected List<Work> executing(AeiObjects aeiObjects, Parallel parallel) throws Exception {
Z
zhourui 已提交
54
		// 发送ProcessingSignal
Z
zhourui 已提交
55 56
		aeiObjects.getProcessingAttributes()
				.push(Signal.parallelExecute(aeiObjects.getWork().getActivityToken(), parallel));
R
roo00 已提交
57 58 59 60
		List<Work> results = new ArrayList<>();
		aeiObjects.getWork().setSplitting(true);
		aeiObjects.getWork().setSplitToken(StringTools.uniqueToken());
		aeiObjects.getWork().getSplitTokenList().add(aeiObjects.getWork().getSplitToken());
R
roo00 已提交
61 62
		/* 并行拆分不影响splitValue */
		// aeiObjects.getWork().setSplitValue("");
R
roo00 已提交
63 64
		/* 新创建并行Work需要单独的workLog,拷贝当前的WorkLog */
		WorkLog mainWorkLog = aeiObjects.getWorkLogs().stream()
R
roo00 已提交
65 66
				.filter(o -> StringUtils.equals(aeiObjects.getWork().getId(), o.getWork())
						&& StringUtils.equals(aeiObjects.getWork().getActivityToken(), o.getFromActivityToken()))
R
roo00 已提交
67 68
				.findFirst().orElse(null);
		mainWorkLog.setSplitting(aeiObjects.getWork().getSplitting());
NoSubject's avatar
NoSubject 已提交
69
		mainWorkLog.getProperties().setSplitTokenList(aeiObjects.getWork().getSplitTokenList());
R
roo00 已提交
70 71 72
		mainWorkLog.setSplitToken(aeiObjects.getWork().getSplitToken());
		mainWorkLog.setSplitValue(aeiObjects.getWork().getSplitValue());
		aeiObjects.getUpdateWorkLogs().add(mainWorkLog);
R
roo00 已提交
73 74 75 76

		List<Route> routes = new ArrayList<>();
		/* 多条路由进行判断 */
		for (Route o : aeiObjects.getRoutes()) {
R
roo00 已提交
77
			ScriptContext scriptContext = aeiObjects.scriptContext();
R
Ray 已提交
78 79 80 81
			//scriptContext.getBindings(ScriptContext.ENGINE_SCOPE).put(ScriptingFactory.BINDING_NAME_ROUTE, o);
			CompiledScript cs = aeiObjects.business().element().getCompiledScript(aeiObjects.getWork().getApplication(),
					o, Business.EVENT_ROUTE);
			if (BooleanUtils.isTrue(JsonScriptingExecutor.evalBoolean(cs, scriptContext))) {
R
roo00 已提交
82 83 84 85 86 87
				routes.add(o);
			}
		}

		for (int i = 0; i < routes.size(); i++) {
			Route route = routes.get(i);
R
roo00 已提交
88 89 90 91 92
			if (i == 0) {
				aeiObjects.getWork().setDestinationRoute(route.getId());
				aeiObjects.getWork().setDestinationRouteName(route.getName());
				results.add(aeiObjects.getWork());
			} else {
R
roo00 已提交
93
				Work work = new Work(aeiObjects.getWork());
R
roo00 已提交
94 95
				work.setDestinationRoute(route.getId());
				work.setDestinationRouteName(route.getName());
Z
zhourui 已提交
96
				// 创建新的Token
R
roo00 已提交
97
				WorkLog workLog = new WorkLog(mainWorkLog);
R
roo00 已提交
98 99 100 101 102 103 104 105 106 107
				workLog.setWork(work.getId());
				aeiObjects.getCreateWorks().add(work);
				aeiObjects.getCreateWorkLogs().add(workLog);
				results.add(work);
			}
		}
		return results;
	}

	@Override
108
	protected void executingCommitted(AeiObjects aeiObjects, Parallel parallel, List<Work> works) throws Exception {
Z
zhourui 已提交
109
		// nothing
R
roo00 已提交
110 111 112 113
	}

	@Override
	protected List<Route> inquiring(AeiObjects aeiObjects, Parallel parallel) throws Exception {
Z
zhourui 已提交
114
		// 发送ProcessingSignal
Z
zhourui 已提交
115 116
		aeiObjects.getProcessingAttributes()
				.push(Signal.parallelInquire(aeiObjects.getWork().getActivityToken(), parallel));
R
roo00 已提交
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
		List<Route> results = new ArrayList<>();
		aeiObjects.getRoutes().stream().forEach(o -> {
			if (StringUtils.equals(o.getId(), aeiObjects.getWork().getDestinationRoute())) {
				results.add(o);
			}
		});
		if (results.isEmpty()) {
			aeiObjects.getRoutes().stream().forEach(o -> {
				if (StringUtils.equals(o.getName(), aeiObjects.getWork().getDestinationRouteName())) {
					results.add(o);
				}
			});
		}
		return results;
	}

	@Override
	protected void inquiringCommitted(AeiObjects aeiObjects, Parallel parallel) throws Exception {
Z
zhourui 已提交
135
		// nothing
R
roo00 已提交
136 137
	}
}