提交 013c7212 编写于 作者: Z zhourui

消息处理

上级 faf27c29
......@@ -144,5 +144,5 @@ if exist "%~dp0local\update" (
)
)
@echo on
"%~dp0jvm\windows\bin\java" -server Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.authenticate=false -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=20000 -Xms2g -Xmx8g -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -jar "%~dp0console.jar"
"%~dp0jvm\windows\bin\java" -server -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.authenticate=false -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=20000 -Xms2g -Xmx8g -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -jar "%~dp0console.jar"
pause
\ No newline at end of file
......@@ -3,22 +3,22 @@ package com.x.base.core.container;
import java.util.List;
import java.util.Random;
import com.x.base.core.entity.JpaObject;
import org.apache.commons.beanutils.PropertyUtils;
import org.apache.openjpa.slice.DistributionPolicy;
import com.x.base.core.entity.SliceJpaObject;
public class FactorDistributionPolicy implements DistributionPolicy {
private Random random = new Random();
public String distribute(Object pc, List<String> slices, Object context) {
try {
Object o = PropertyUtils.getProperty(pc, SliceJpaObject.distributeFactor_FIELDNAME);
Object o = PropertyUtils.getProperty(pc,JpaObject.distributeFactor_FIELDNAME);
Integer factor = null;
if (null == o) {
factor = random.nextInt(1000);
PropertyUtils.setProperty(pc, SliceJpaObject.distributeFactor_FIELDNAME, factor);
PropertyUtils.setProperty(pc, JpaObject.distributeFactor_FIELDNAME, factor);
} else {
factor = (Integer) o;
}
......
......@@ -64,7 +64,6 @@ public class Main {
slf4jOtherImplOn = true;
}
org.slf4j.impl.StaticLoggerBinder.getSingleton();
System.out.println("logger:" + org.slf4j.LoggerFactory.getLogger(Main.class));
SystemOutErrorSideCopyBuilder.start();
ResourceFactory.bind();
CommandFactory.printStartHelp();
......
......@@ -77,9 +77,9 @@ public class DumpData {
persistence.getName(), PersistenceXmlHelper.properties(cls.getName(), Config.slice().getEnable()));
EntityManager em = emf.createEntityManager();
try {
logger.print("dump data({}/{}): {}, count: {}.", (i + 1), classNames.size(), cls.getName(),
this.estimateCount(em, cls));
this.dump(cls, em);
long estimateCount = this.estimateCount(em, cls);
logger.print("dump data({}/{}): {}, count: {}.", (i + 1), classNames.size(), cls.getName(), estimateCount);
this.dump(cls, em, estimateCount);
} finally {
em.close();
emf.close();
......@@ -104,7 +104,7 @@ public class DumpData {
return this.catalog.values().stream().mapToInt(Integer::intValue).sum();
}
private <T> void dump(Class<T> cls, EntityManager em)
private <T> void dump(Class<T> cls, EntityManager em, long total)
throws IOException, IllegalAccessException, InvocationTargetException, NoSuchMethodException {
/** 创建最终存储文件的目录 */
File directory = new File(dir, cls.getName());
......@@ -114,6 +114,8 @@ public class DumpData {
String id = "";
List<T> list = null;
int count = 0;
int loop = 1;
int btach = (int) ((total + 0.0) / containerEntity.dumpSize());
do {
list = this.list(em, cls, id, containerEntity.dumpSize());
if (ListTools.isNotEmpty(list)) {
......@@ -121,6 +123,7 @@ public class DumpData {
id = BeanUtils.getProperty(list.get(list.size() - 1), JpaObject.id_FIELDNAME);
File file = new File(directory, count + ".json");
FileUtils.write(file, pureGsonDateFormated.toJson(list), DefaultCharset.charset);
logger.print("dumping {}/{} part of data:{}.", loop, btach, cls.getName());
}
em.clear();
} while (ListTools.isNotEmpty(list));
......
......@@ -133,7 +133,7 @@ public class RestoreData {
File file = null;
for (int i = 0; i < files.size(); i++) {
file = files.get(i);
System.out.println("restoring " + (i + 1) + "/" + files.size() + " part of data: " + cls.getName() + ".");
logger.print("restoring {}/{} part of data:{}.", (i + 1), files.size(), cls.getName());
JsonArray raws = this.convert(file);
em.getTransaction().begin();
for (JsonElement o : raws) {
......@@ -143,7 +143,6 @@ public class RestoreData {
}
em.getTransaction().commit();
em.clear();
Runtime.getRuntime().gc();
}
System.out.println("restore data: " + cls.getName() + " completed, count: " + count + ".");
return count;
......
......@@ -37,14 +37,13 @@ public abstract class JettySeverTools {
config.setSendServerVersion(true);
config.setSendDateHeader(false);
ServerConnector sslConnector = new ServerConnector(server,
ServerConnector https = new ServerConnector(server,
new SslConnectionFactory(sslContextFactory, HttpVersion.HTTP_1_1.asString()),
new HttpConnectionFactory(config));
/* 添加到32,压力测试 */
sslConnector.setAcceptQueueSize(32);
sslConnector.setIdleTimeout(30000);
sslConnector.setPort(port);
server.addConnector(sslConnector);
https.setAcceptQueueSize(-1);
https.setIdleTimeout(30000);
https.setPort(port);
server.addConnector(https);
}
protected static void addHttpConnector(Server server, Integer port) throws Exception {
......@@ -55,8 +54,7 @@ public abstract class JettySeverTools {
config.setSendServerVersion(true);
config.setSendDateHeader(false);
ServerConnector http = new ServerConnector(server, new HttpConnectionFactory(config));
/* 添加到32,压力测试 */
http.setAcceptQueueSize(32);
http.setAcceptQueueSize(-1);
http.setIdleTimeout(30000);
http.setPort(port);
server.addConnector(http);
......
......@@ -50,7 +50,6 @@ class ActionCreate extends BaseAction {
}
}
Instant instant = this.instant(effectivePerson, business, wi, new ArrayList<>(consumersV2.keySet()));
logger.info("生成消息。,,,consumersV2 "+ consumersV2.size());
if (!consumersV2.isEmpty()) {
for (String consumer : consumersV2.keySet()) {
Wi cpwi = wi;
......@@ -88,7 +87,6 @@ class ActionCreate extends BaseAction {
logger.warn("执行消息发送脚本[{}]方法异常:{}", func, e.getMessage());
}
Message message = null;
logger.info("这里是生成消息对象,consumer:"+consumer);
switch (Objects.toString(consumer, "")) {
case MessageConnector.CONSUME_WS:
message = this.wsMessage(effectivePerson, business, cpwi, instant);
......
......@@ -50,7 +50,6 @@ class ActionCreate extends BaseAction {
}
}
Instant instant = this.instant(effectivePerson, business, wi, new ArrayList<>(consumersV2.keySet()));
logger.info("生成消息。,,,consumersV2 "+ consumersV2.size());
if (!consumersV2.isEmpty()) {
for (String consumer : consumersV2.keySet()) {
Wi cpwi = wi;
......@@ -88,7 +87,6 @@ class ActionCreate extends BaseAction {
logger.warn("执行消息发送脚本[{}]方法异常:{}", func, e.getMessage());
}
Message message = null;
logger.info("这里是生成消息对象,consumer:"+consumer);
switch (Objects.toString(consumer, "")) {
case MessageConnector.CONSUME_WS:
message = this.wsMessage(effectivePerson, business, cpwi, instant);
......
......@@ -27,7 +27,7 @@ import com.x.base.core.entity.annotation.ContainerEntity;
import com.x.base.core.project.annotation.FieldDescribe;
@Entity
@ContainerEntity(dumpSize = 1000, type = ContainerEntity.Type.content, reference = ContainerEntity.Reference.strong)
@ContainerEntity(dumpSize = 1000, type = ContainerEntity.Type.log, reference = ContainerEntity.Reference.soft)
@Table(name = PersistenceProperties.Instant.table, uniqueConstraints = {
@UniqueConstraint(name = PersistenceProperties.Instant.table + JpaObject.IndexNameMiddle
+ JpaObject.DefaultUniqueConstraintSuffix, columnNames = { JpaObject.IDCOLUMN,
......
......@@ -20,7 +20,7 @@ import com.x.base.core.entity.annotation.ContainerEntity;
import com.x.base.core.project.annotation.FieldDescribe;
@Entity
@ContainerEntity(dumpSize = 1000, type = ContainerEntity.Type.content, reference = ContainerEntity.Reference.strong)
@ContainerEntity(dumpSize = 1000, type = ContainerEntity.Type.log, reference = ContainerEntity.Reference.soft)
@Table(name = PersistenceProperties.Message.table, uniqueConstraints = {
@UniqueConstraint(name = PersistenceProperties.Message.table + JpaObject.IndexNameMiddle
+ JpaObject.DefaultUniqueConstraintSuffix, columnNames = { JpaObject.IDCOLUMN,
......
......@@ -14,6 +14,8 @@ import com.x.processplatform.assemble.surface.Business;
import com.x.processplatform.core.entity.element.ApplicationDictItem;
import com.x.processplatform.core.entity.element.ApplicationDictItem_;
import org.apache.commons.lang3.StringUtils;
public class ApplicationDictItemFactory extends AbstractFactory {
public ApplicationDictItemFactory(Business abstractBusiness) throws Exception {
......@@ -52,7 +54,11 @@ public class ApplicationDictItemFactory extends AbstractFactory {
CriteriaQuery<ApplicationDictItem> cq = cb.createQuery(ApplicationDictItem.class);
Root<ApplicationDictItem> root = cq.from(ApplicationDictItem.class);
Predicate p = cb.equal(root.get(ApplicationDictItem_.bundle), applicationDict);
p = cb.and(p, cb.equal(root.get("path0"), path0));
if (StringUtils.isEmpty(path0)) {
p = cb.and(p, cb.or(cb.equal(root.get(ApplicationDictItem_.path0), path0), cb.isNull(root.get(ApplicationDictItem_.path0))));
} else {
p = cb.and(p, cb.equal(root.get(ApplicationDictItem_.path0), path0));
}
p = cb.and(p, cb.equal(root.get("path1"), path1));
p = cb.and(p, cb.equal(root.get("path2"), path2));
p = cb.and(p, cb.equal(root.get("path3"), path3));
......
......@@ -111,7 +111,7 @@ class ActionCreate extends BaseAction {
}
/* 驱动工作,使用非队列方式 */
ThisApplication.context().applications().putQuery(x_processplatform_service_processing.class,
Applications.joinQueryUri("work", workId, "prcocssing", "nonblocking"), null, processFlag);
Applications.joinQueryUri("work", workId, "processing", "nonblocking"), null);
} else {
/* 如果是草稿,准备后面的直接打开 */
workId = lastestWorkId;
......@@ -196,15 +196,6 @@ class ActionCreate extends BaseAction {
private static final long serialVersionUID = 1307569946729101786L;
// private static List<String> Includes = ListTools.toList("createTime",
// "updateTime", "completed", "fromActivity",
// "fromActivityType", "fromActivityName", "fromActivityToken",
// "fromTime", "arrivedActivity",
// "arrivedActivityType", "arrivedActivityName", "arrivedActivityToken",
// "arrivedTime", "route",
// "routeName", "work", "workCompleted", "connected", "splitting",
// "splitTokenList", "processingType");
static WrapCopier<WorkLog, Wo> copier = WrapCopierFactory.wo(WorkLog.class, Wo.class, null,
JpaObject.FieldsInvisible);
......
......@@ -103,7 +103,7 @@ class ActionCreateForce extends BaseAction {
}
/* 驱动工作,使用非队列方式 */
ThisApplication.context().applications().putQuery(x_processplatform_service_processing.class,
Applications.joinQueryUri("work", workId, "prcocssing", "nonblocking"), null, processFlag);
Applications.joinQueryUri("work", workId, "processing", "nonblocking"), null, processFlag);
} else {
/* 如果是草稿,准备后面的直接打开 */
workId = lastestWorkId;
......
......@@ -117,7 +117,7 @@ class ActionCreateWithApplicationProcess extends BaseAction {
}
/* 驱动工作,使用非队列方式 */
ThisApplication.context().applications().putQuery(x_processplatform_service_processing.class,
Applications.joinQueryUri("work", workId, "prcocssing", "nonblocking"), null,processFlag);
Applications.joinQueryUri("work", workId, "processing", "nonblocking"), null,processFlag);
} else {
/* 如果是草稿,准备后面的直接打开 */
workId = lastestWorkId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册