提交 10ba7bd1 编写于 作者: R Ray

add archive hadoop

上级 f853f190
......@@ -1070,7 +1070,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.1</version>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.sun.mail</groupId>
......
......@@ -131,11 +131,27 @@ public class Message extends GsonPropertyObject {
}
public static Consumer concreteConsumer(String type) {
switch (Objects.toString(type, "")) {
switch (StringUtils.lowerCase(Objects.toString(type, ""))) {
case MessageConnector.CONSUME_WS:
return new WsConsumer();
case MessageConnector.CONSUME_PMS_INNER:
return new PmsinnerConsumer();
case MessageConnector.CONSUME_CALENDAR:
return new CalendarConsumer();
case MessageConnector.CONSUME_DINGDING:
return new DingdingConsumer();
case MessageConnector.CONSUME_WELINK:
return new WelinkConsumer();
case MessageConnector.CONSUME_ZHENGWUDINGDING:
return new ZhengwudingdingConsumer();
case MessageConnector.CONSUME_QIYEWEIXIN:
return new QiyeweixinConsumer();
case MessageConnector.CONSUME_MPWEIXIN:
return new MpweixinConsumer();
case MessageConnector.CONSUME_KAFKA:
return new KafkaConsumer();
case MessageConnector.CONSUME_ACTIVEMQ:
return new ActiveMqConsumer();
return new ActivemqConsumer();
case MessageConnector.CONSUME_RESTFUL:
return new RestfulConsumer();
case MessageConnector.CONSUME_MAIL:
......@@ -153,6 +169,86 @@ public class Message extends GsonPropertyObject {
}
}
public static class WsConsumer extends Consumer {
private static final long serialVersionUID = 8702816982685612136L;
public WsConsumer() {
super(MessageConnector.CONSUME_WS, true);
}
}
public static class PmsinnerConsumer extends Consumer {
private static final long serialVersionUID = -1246633610717846231L;
public PmsinnerConsumer() {
super(MessageConnector.CONSUME_PMS_INNER, true);
}
}
public static class CalendarConsumer extends Consumer {
private static final long serialVersionUID = -1453591270935170682L;
public CalendarConsumer() {
super(MessageConnector.CONSUME_CALENDAR, false);
}
}
public static class DingdingConsumer extends Consumer {
private static final long serialVersionUID = -2273422698767839910L;
public DingdingConsumer() {
super(MessageConnector.CONSUME_DINGDING, true);
}
}
public static class WelinkConsumer extends Consumer {
private static final long serialVersionUID = -5796171639649346866L;
public WelinkConsumer() {
super(MessageConnector.CONSUME_WELINK, true);
}
}
public static class ZhengwudingdingConsumer extends Consumer {
private static final long serialVersionUID = -1805579720843025600L;
public ZhengwudingdingConsumer() {
super(MessageConnector.CONSUME_ZHENGWUDINGDING, true);
}
}
public static class QiyeweixinConsumer extends Consumer {
private static final long serialVersionUID = -3957612144231971034L;
public QiyeweixinConsumer() {
super(MessageConnector.CONSUME_QIYEWEIXIN, true);
}
}
public static class MpweixinConsumer extends Consumer {
private static final long serialVersionUID = -9116678126784563430L;
public MpweixinConsumer() {
super(MessageConnector.CONSUME_MPWEIXIN, true);
}
}
public static class ApiConsumer extends Consumer {
private static final long serialVersionUID = -4452633351300698272L;
......@@ -270,7 +366,7 @@ public class Message extends GsonPropertyObject {
}
private static final String DEFAULT_DRIVERCLASS = "com.mysql.cj.jdbc.Driver";
private static final String DEFAULT_URL = "jdbc:mysql://127.0.0.1:3306/TEST?autoReconnect=true&allowPublicKeyRetrieval=true&useSSL=false&useUnicode=true&characterEncoding=UTF-8&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8";
private static final String DEFAULT_URL = "jdbc:mysql://";
private static final String DEFAULT_USERNAME = "root";
private static final String DEFAULT_PASSWORD = "password";
private static final String DEFAULT_CATALOG = "";
......@@ -386,7 +482,7 @@ public class Message extends GsonPropertyObject {
this.path = DEFAULT_PATH;
}
private static final String DEFAULT_FS_DEFAULTFS = "hdfs://127.0.0.1:9000";
private static final String DEFAULT_FS_DEFAULTFS = "hdfs://";
private static final String DEFAULT_USERNAME = "";
private static final String DEFAULT_PATH = "";
......@@ -426,7 +522,7 @@ public class Message extends GsonPropertyObject {
this.password = DEFAULT_PASSWORD;
}
private static final String DEFAULT_BOOTSTRAPSERVERS = "127.0.0.1:9092";
private static final String DEFAULT_BOOTSTRAPSERVERS = "";
private static final String DEFAULT_TOPIC = "o2oa";
private static final String DEFAULT_SECURITYPROTOCOL = "SASL_PLAINTEXT";
private static final String DEFAULT_SASLMECHANISM = "PLAIN";
......@@ -477,11 +573,11 @@ public class Message extends GsonPropertyObject {
}
public static class ActiveMqConsumer extends Consumer {
public static class ActivemqConsumer extends Consumer {
private static final long serialVersionUID = -7469816290407400176L;
public ActiveMqConsumer() {
public ActivemqConsumer() {
super(MessageConnector.CONSUME_ACTIVEMQ, false);
this.username = DEFAULT_USERNAME;
this.password = DEFAULT_PASSWORD;
......
......@@ -6,18 +6,28 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.lang3.StringUtils;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.x.base.core.project.config.Message.ActiveMqConsumer;
import com.x.base.core.project.config.Message.ActivemqConsumer;
import com.x.base.core.project.config.Message.ApiConsumer;
import com.x.base.core.project.config.Message.CalendarConsumer;
import com.x.base.core.project.config.Message.Consumer;
import com.x.base.core.project.config.Message.DingdingConsumer;
import com.x.base.core.project.config.Message.HadoopConsumer;
import com.x.base.core.project.config.Message.JdbcConsumer;
import com.x.base.core.project.config.Message.KafkaConsumer;
import com.x.base.core.project.config.Message.MailConsumer;
import com.x.base.core.project.config.Message.MpweixinConsumer;
import com.x.base.core.project.config.Message.PmsinnerConsumer;
import com.x.base.core.project.config.Message.QiyeweixinConsumer;
import com.x.base.core.project.config.Message.RestfulConsumer;
import com.x.base.core.project.config.Message.TableConsumer;
import com.x.base.core.project.config.Message.WelinkConsumer;
import com.x.base.core.project.config.Message.WsConsumer;
import com.x.base.core.project.config.Message.ZhengwudingdingConsumer;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.message.MessageConnector;
......@@ -33,10 +43,10 @@ public class Messages extends ConcurrentSkipListMap<String, Message> {
MessageConnector.CONSUME_API, MessageConnector.CONSUME_JDBC, MessageConnector.CONSUME_TABLE,
MessageConnector.CONSUME_HADOOP);
private static final Message MESSAGE_INNER = new Message(MessageConnector.CONSUME_WS,
MessageConnector.CONSUME_PMS_INNER, MessageConnector.CONSUME_CALENDAR, MessageConnector.CONSUME_DINGDING,
MessageConnector.CONSUME_WELINK, MessageConnector.CONSUME_ZHENGWUDINGDING,
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_MPWEIXIN);
private static final Message MESSAGE_NOTICE = new Message(MessageConnector.CONSUME_WS,
MessageConnector.CONSUME_PMS_INNER, MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_WELINK,
MessageConnector.CONSUME_ZHENGWUDINGDING, MessageConnector.CONSUME_QIYEWEIXIN,
MessageConnector.CONSUME_MPWEIXIN);
private static final Message MESSAGE_OUTER = new Message(MessageConnector.CONSUME_KAFKA,
MessageConnector.CONSUME_ACTIVEMQ, MessageConnector.CONSUME_RESTFUL, MessageConnector.CONSUME_MAIL,
......@@ -93,9 +103,9 @@ public class Messages extends ConcurrentSkipListMap<String, Message> {
o.put(MessageConnector.TYPE_TEAMWORK_TASKUPDATE, MESSAGE_ALL.cloneThenSetDescription("工作管理任务更新"));
o.put(MessageConnector.TYPE_TEAMWORK_TASKDELETE, MESSAGE_ALL.cloneThenSetDescription("工作管理任务删除"));
o.put(MessageConnector.TYPE_TEAMWORK_TASKOVERTIME, MESSAGE_ALL.cloneThenSetDescription("工作管理任务超时"));
o.put(MessageConnector.TYPE_TEAMWORK_CHAT, MESSAGE_INNER.cloneThenSetDescription("工作管理聊天"));
o.put(MessageConnector.TYPE_TEAMWORK_CHAT, MESSAGE_NOTICE.cloneThenSetDescription("工作管理聊天"));
o.put(MessageConnector.TYPE_CMS_PUBLISH, MESSAGE_OUTER.cloneThenSetDescription("内容管理发布"));
o.put(MessageConnector.TYPE_CMS_PUBLISH_TO_CREATOR, MESSAGE_INNER.cloneThenSetDescription("内容管理发布创建者通知"));
o.put(MessageConnector.TYPE_CMS_PUBLISH_TO_CREATOR, MESSAGE_NOTICE.cloneThenSetDescription("内容管理发布创建者通知"));
o.put(MessageConnector.TYPE_BBS_SUBJECTCREATE, MESSAGE_ALL.cloneThenSetDescription("论坛创建贴子"));
o.put(MessageConnector.TYPE_BBS_REPLYCREATE, MESSAGE_ALL.cloneThenSetDescription("论坛创建回复"));
o.put(MessageConnector.TYPE_MIND_FILESEND, MESSAGE_ALL.cloneThenSetDescription("脑图发送"));
......@@ -130,12 +140,36 @@ public class Messages extends ConcurrentSkipListMap<String, Message> {
JsonObject jsonObject = jsonElement.getAsJsonObject();
JsonElement typeElement = jsonObject.get(Message.Consumer.FIELD_TYPE);
if (null != typeElement && typeElement.isJsonPrimitive()) {
switch (typeElement.getAsString()) {
switch (StringUtils.lowerCase(typeElement.getAsString())) {
case MessageConnector.CONSUME_WS:
list.add(gson.fromJson(jsonElement, WsConsumer.class));
break;
case MessageConnector.CONSUME_PMS_INNER:
list.add(gson.fromJson(jsonElement, PmsinnerConsumer.class));
break;
case MessageConnector.CONSUME_CALENDAR:
list.add(gson.fromJson(jsonElement, CalendarConsumer.class));
break;
case MessageConnector.CONSUME_DINGDING:
list.add(gson.fromJson(jsonElement, DingdingConsumer.class));
break;
case MessageConnector.CONSUME_WELINK:
list.add(gson.fromJson(jsonElement, WelinkConsumer.class));
break;
case MessageConnector.CONSUME_ZHENGWUDINGDING:
list.add(gson.fromJson(jsonElement, ZhengwudingdingConsumer.class));
break;
case MessageConnector.CONSUME_QIYEWEIXIN:
list.add(gson.fromJson(jsonElement, QiyeweixinConsumer.class));
break;
case MessageConnector.CONSUME_MPWEIXIN:
list.add(gson.fromJson(jsonElement, MpweixinConsumer.class));
break;
case MessageConnector.CONSUME_KAFKA:
list.add(gson.fromJson(jsonElement, KafkaConsumer.class));
break;
case MessageConnector.CONSUME_ACTIVEMQ:
list.add(gson.fromJson(jsonElement, ActiveMqConsumer.class));
list.add(gson.fromJson(jsonElement, ActivemqConsumer.class));
break;
case MessageConnector.CONSUME_RESTFUL:
list.add(gson.fromJson(jsonElement, RestfulConsumer.class));
......
......@@ -186,6 +186,9 @@ public class ProcessPlatform extends ConfigObject {
@FieldDescribe("同步到自建表设置.")
private UpdateTable updateTable;
@FieldDescribe("归档到Hadoop.")
private ArchiveHadoop archiveHadoop;
@FieldDescribe("事件扩充.")
private ExtensionEvents extensionEvents;
......@@ -223,8 +226,8 @@ public class ProcessPlatform extends ConfigObject {
return this.attachmentConfig == null ? new AttachmentConfig() : attachmentConfig;
}
public void setAttachmentConfig(AttachmentConfig attachmentConfig) {
this.attachmentConfig = attachmentConfig;
public ArchiveHadoop getArchiveHadoop() {
return this.archiveHadoop == null ? new ArchiveHadoop() : this.archiveHadoop;
}
public Urge getUrge() {
......@@ -306,13 +309,6 @@ public class ProcessPlatform extends ConfigObject {
return BooleanUtils.isTrue(this.enable);
}
public void setCron(String cron) {
this.cron = cron;
}
public void setEnable(Boolean enable) {
this.enable = enable;
}
}
public static class Expire extends ConfigObject {
......@@ -637,14 +633,6 @@ public class ProcessPlatform extends ConfigObject {
return (count == null || count < 0) ? DEFAULT_COUNT : this.count;
}
public void setIntervalMinutes(Integer intervalMinutes) {
this.intervalMinutes = intervalMinutes;
}
public void setCount(Integer count) {
this.count = count;
}
}
public static class AttachmentConfig extends ConfigObject {
......@@ -670,25 +658,14 @@ public class ProcessPlatform extends ConfigObject {
return fileSize;
}
public void setFileSize(Integer fileSize) {
this.fileSize = fileSize;
}
public List<String> getFileTypeIncludes() {
return fileTypeIncludes;
}
public void setFileTypeIncludes(List<String> fileTypeIncludes) {
this.fileTypeIncludes = fileTypeIncludes;
}
public List<String> getFileTypeExcludes() {
return fileTypeExcludes;
}
public void setFileTypeExcludes(List<String> fileTypeExcludes) {
this.fileTypeExcludes = fileTypeExcludes;
}
}
public static class ExtensionEvents {
......@@ -821,50 +798,26 @@ public class ProcessPlatform extends ConfigObject {
return enable;
}
public void setEnable(Boolean enable) {
this.enable = enable;
}
public List<String> getApplications() {
return applications;
}
public void setApplications(List<String> applications) {
this.applications = applications;
}
public List<String> getProcesses() {
return processes;
}
public void setProcesses(List<String> processes) {
this.processes = processes;
}
public List<String> getActivities() {
return activities;
}
public void setActivities(List<String> activities) {
this.activities = activities;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getCustom() {
return custom;
}
public void setCustom(String custom) {
this.custom = custom;
}
}
public static class WorkCompletedExtensionEvents extends ArrayList<WorkCompletedExtensionEvent> {
......@@ -906,59 +859,45 @@ public class ProcessPlatform extends ConfigObject {
return enable;
}
public void setEnable(Boolean enable) {
this.enable = enable;
}
public List<String> getApplications() {
return applications;
}
public void setApplications(List<String> applications) {
this.applications = applications;
}
public List<String> getProcesses() {
return processes;
}
public void setProcesses(List<String> processes) {
this.processes = processes;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getCustom() {
return custom;
}
public void setCustom(String custom) {
this.custom = custom;
}
}
public static class UpdateTable extends ConfigObject {
public static class ArchiveHadoop extends ConfigObject {
private static final long serialVersionUID = -7066262450518673067L;
private static final long serialVersionUID = -8274136904009320770L;
public static UpdateTable defaultInstance() {
return new UpdateTable();
public static ArchiveHadoop defaultInstance() {
return new ArchiveHadoop();
}
public static final Boolean DEFAULT_ENABLE = true;
public ArchiveHadoop() {
this.enable = DEFAULT_ENABLE;
this.cron = DEFAULT_CRON;
this.fsDefaultFS = DEFAULT_FS_DEFAULTFS;
this.username = DEFAULT_USERNAME;
this.path = DEFAULT_PATH;
}
private static final Boolean DEFAULT_ENABLE = false;
public static final String DEFAULT_CRON = "20 20 * * * ?";
public static final Integer DEFAULT_RETRYMINUTES = 20;
public static final Integer DEFAULT_THRESHOLDMINUTES = 60 * 24 * 7;
private static final String DEFAULT_FS_DEFAULTFS = "hdfs://";
private static final String DEFAULT_USERNAME = "";
private static final String DEFAULT_PATH = "";
@FieldDescribe("是否启用")
private Boolean enable = DEFAULT_ENABLE;
......@@ -966,11 +905,14 @@ public class ProcessPlatform extends ConfigObject {
@FieldDescribe("定时cron表达式.")
private String cron = DEFAULT_CRON;
@FieldDescribe("重试间隔(分钟),默认20分钟.")
private Integer retryMinutes = DEFAULT_RETRYMINUTES;
@FieldDescribe("hadoop地址.")
private String fsDefaultFS;
@FieldDescribe("最大保留期限(分钟),默认10080分钟(7天).")
private Integer thresholdMinutes = DEFAULT_THRESHOLDMINUTES;
@FieldDescribe("hadoop用户名.")
private String username;
@FieldDescribe("fs路径前缀.")
private String path;
public String getCron() {
if (StringUtils.isNotEmpty(this.cron) && CronExpression.isValidExpression(this.cron)) {
......@@ -984,28 +926,47 @@ public class ProcessPlatform extends ConfigObject {
return BooleanUtils.isTrue(this.enable);
}
public void setCron(String cron) {
this.cron = cron;
public String getFsDefaultFS() {
return StringUtils.isEmpty(this.fsDefaultFS) ? DEFAULT_FS_DEFAULTFS : this.fsDefaultFS;
}
public void setEnable(Boolean enable) {
this.enable = enable;
public String getUsername() {
return StringUtils.isEmpty(this.username) ? DEFAULT_USERNAME : this.username;
}
public Integer getRetryMinutes() {
return NumberTools.nullOrLessThan(this.retryMinutes, 0) ? DEFAULT_RETRYMINUTES : this.retryMinutes;
public String getPath() {
return StringUtils.isEmpty(this.path) ? DEFAULT_PATH : this.path;
}
}
public void setRetryMinutes(Integer retryMinutes) {
this.retryMinutes = retryMinutes;
public static class UpdateTable extends ConfigObject {
private static final long serialVersionUID = -7066262450518673067L;
public static UpdateTable defaultInstance() {
return new UpdateTable();
}
public Integer getThresholdMinutes() {
return NumberTools.nullOrLessThan(this.thresholdMinutes, 0) ? DEFAULT_THRESHOLDMINUTES : this.thresholdMinutes;
public static final Boolean DEFAULT_ENABLE = true;
public static final String DEFAULT_CRON = "20 20 * * * ?";
@FieldDescribe("是否启用")
private Boolean enable = DEFAULT_ENABLE;
@FieldDescribe("定时cron表达式.")
private String cron = DEFAULT_CRON;
public String getCron() {
if (StringUtils.isNotEmpty(this.cron) && CronExpression.isValidExpression(this.cron)) {
return this.cron;
} else {
return DEFAULT_CRON;
}
}
public void setThresholdMinutes(Integer thresholdMinutes) {
this.thresholdMinutes = thresholdMinutes;
public Boolean getEnable() {
return BooleanUtils.isTrue(this.enable);
}
}
......
......@@ -191,7 +191,7 @@ public class MessageConnector {
public static final String CONSUME_WELINK = "welink";
public static final String CONSUME_ZHENGWUDINGDING = "zhengwuDingding";
public static final String CONSUME_ZHENGWUDINGDING = "zhengwudingding";
public static final String CONSUME_QIYEWEIXIN = "qiyeweixin";
......@@ -199,7 +199,7 @@ public class MessageConnector {
public static final String CONSUME_KAFKA = "kafka";
public static final String CONSUME_ACTIVEMQ = "activeMq";
public static final String CONSUME_ACTIVEMQ = "activemq";
// restful类型
public static final String CONSUME_RESTFUL = "restful";
// 邮件类型
......
......@@ -25,16 +25,15 @@
<groupId>o2oa</groupId>
<artifactId>x_message_core_entity</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.14.5</version>
<version>5.17.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
......
......@@ -26,7 +26,7 @@ import com.google.gson.Gson;
import com.x.base.core.container.EntityManagerContainer;
import com.x.base.core.container.factory.EntityManagerContainerFactory;
import com.x.base.core.entity.JpaObject_;
import com.x.base.core.project.config.Message.ActiveMqConsumer;
import com.x.base.core.project.config.Message.ActivemqConsumer;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
......@@ -36,14 +36,14 @@ import com.x.base.core.project.tools.ListTools;
import com.x.message.core.entity.Message;
import com.x.message.core.entity.Message_;
public class ActiveMqConsumeQueue extends AbstractQueue<Message> {
public class ActivemqConsumeQueue extends AbstractQueue<Message> {
private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMqConsumeQueue.class);
private static final Logger LOGGER = LoggerFactory.getLogger(ActivemqConsumeQueue.class);
private static final Gson gson = XGsonBuilder.instance();
protected void execute(Message message) throws Exception {
if (null != message && StringUtils.isNotEmpty(message.getItem())) {
if (null != message) {
update(message);
}
List<String> ids = listOverStay();
......@@ -53,9 +53,7 @@ public class ActiveMqConsumeQueue extends AbstractQueue<Message> {
Optional<Message> optional = find(id);
if (optional.isPresent()) {
message = optional.get();
if (StringUtils.isNotEmpty(message.getItem())) {
update(message);
}
update(message);
}
}
}
......@@ -72,8 +70,8 @@ public class ActiveMqConsumeQueue extends AbstractQueue<Message> {
private void update(Message message) {
try {
ActiveMqConsumer consumer = gson.fromJson(message.getProperties().getConsumerJsonElement(),
ActiveMqConsumer.class);
ActivemqConsumer consumer = gson.fromJson(message.getProperties().getConsumerJsonElement(),
ActivemqConsumer.class);
producer(message, consumer);
success(message.getId());
} catch (Exception e) {
......@@ -82,7 +80,7 @@ public class ActiveMqConsumeQueue extends AbstractQueue<Message> {
}
}
private void producer(Message message, ActiveMqConsumer consumer) throws JMSException {
private void producer(Message message, ActivemqConsumer consumer) throws JMSException {
ActiveMQConnectionFactory connectionFactory;
......@@ -92,7 +90,7 @@ public class ActiveMqConsumeQueue extends AbstractQueue<Message> {
} else {
connectionFactory = new ActiveMQConnectionFactory(consumer.getUrl());
}
connectionFactory.setTrustedPackages(ListTools.toList(ActiveMqConsumeQueue.class.getPackage().getName()));
connectionFactory.setTrustedPackages(ListTools.toList(ActivemqConsumeQueue.class.getPackage().getName()));
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
......
......@@ -41,7 +41,7 @@ public class ApiConsumeQueue extends AbstractQueue<Message> {
private static Gson gson = XGsonBuilder.instance();
protected void execute(Message message) throws Exception {
if (null != message && StringUtils.isNotEmpty(message.getItem())) {
if (null != message) {
update(message);
}
List<String> ids = listOverStay();
......@@ -51,9 +51,7 @@ public class ApiConsumeQueue extends AbstractQueue<Message> {
Optional<Message> optional = find(id);
if (optional.isPresent()) {
message = optional.get();
if (StringUtils.isNotEmpty(message.getItem())) {
update(message);
}
update(message);
}
}
}
......
......@@ -42,7 +42,8 @@ public class HadoopConsumeQueue extends AbstractQueue<Message> {
private static final Gson gson = XGsonBuilder.instance();
protected void execute(Message message) throws Exception {
if (null != message && StringUtils.isNotEmpty(message.getItem())) {
LOGGER.debug("execute message:{}.", message::toString);
if (null != message) {
update(message);
}
List<String> ids = listOverStay();
......@@ -52,9 +53,7 @@ public class HadoopConsumeQueue extends AbstractQueue<Message> {
Optional<Message> optional = find(id);
if (optional.isPresent()) {
message = optional.get();
if (StringUtils.isNotEmpty(message.getItem())) {
update(message);
}
update(message);
}
}
}
......@@ -75,9 +74,14 @@ public class HadoopConsumeQueue extends AbstractQueue<Message> {
HadoopConsumer.class);
try (FileSystem fileSystem = FileSystem.get(configuration(consumer));
InputStream inputStream = new ByteArrayInputStream(
message.getBody().getBytes(StandardCharsets.UTF_8));
FSDataOutputStream outputStream = fileSystem.create(path(message, consumer))) {
inputStream.transferTo(outputStream);
message.getBody().getBytes(StandardCharsets.UTF_8))) {
org.apache.hadoop.fs.Path path = path(message, consumer);
if (fileSystem.exists(path)) {
fileSystem.delete(path, false);
}
try (FSDataOutputStream outputStream = fileSystem.create(path)) {
inputStream.transferTo(outputStream);
}
}
success(message.getId());
} catch (Exception e) {
......@@ -97,10 +101,10 @@ public class HadoopConsumeQueue extends AbstractQueue<Message> {
}
if (StringUtils.isNotEmpty(message.getPerson())) {
path = new org.apache.hadoop.fs.Path(path, new org.apache.hadoop.fs.Path(message.getPerson()));
} else {
path = new org.apache.hadoop.fs.Path(path, new org.apache.hadoop.fs.Path("default"));
}
if (StringUtils.isNotEmpty(message.getTitle())) {
path = new org.apache.hadoop.fs.Path(path, new org.apache.hadoop.fs.Path(message.getTitle()));
}
path = new org.apache.hadoop.fs.Path(path, new org.apache.hadoop.fs.Path(message.getId()));
return path;
}
......
......@@ -52,7 +52,7 @@ public class JdbcConsumeQueue extends AbstractQueue<Message> {
private static final Gson gson = XGsonBuilder.instance();
protected void execute(Message message) throws Exception {
if (null != message && StringUtils.isNotEmpty(message.getItem())) {
if (null != message) {
update(message);
}
List<String> ids = listOverStay();
......@@ -62,9 +62,7 @@ public class JdbcConsumeQueue extends AbstractQueue<Message> {
Optional<Message> optional = find(id);
if (optional.isPresent()) {
message = optional.get();
if (StringUtils.isNotEmpty(message.getItem())) {
update(message);
}
update(message);
}
}
}
......
......@@ -40,7 +40,7 @@ public class KafkaConsumeQueue extends AbstractQueue<Message> {
private static final Gson gson = XGsonBuilder.instance();
protected void execute(Message message) throws Exception {
if (null != message && StringUtils.isNotEmpty(message.getItem())) {
if (null != message) {
update(message);
}
List<String> ids = listOverStay();
......@@ -50,9 +50,7 @@ public class KafkaConsumeQueue extends AbstractQueue<Message> {
Optional<Message> optional = find(id);
if (optional.isPresent()) {
message = optional.get();
if (StringUtils.isNotEmpty(message.getItem())) {
update(message);
}
update(message);
}
}
}
......
......@@ -41,7 +41,7 @@ public class MailConsumeQueue extends AbstractQueue<Message> {
private static final Gson gson = XGsonBuilder.instance();
protected void execute(Message message) throws Exception {
if (null != message && StringUtils.isNotEmpty(message.getItem())) {
if (null != message) {
update(message);
}
List<String> ids = listOverStay();
......@@ -51,9 +51,7 @@ public class MailConsumeQueue extends AbstractQueue<Message> {
Optional<Message> optional = find(id);
if (optional.isPresent()) {
message = optional.get();
if (StringUtils.isNotEmpty(message.getItem())) {
update(message);
}
update(message);
}
}
}
......
......@@ -29,9 +29,9 @@ import com.x.organization.core.entity.Person;
* 发送微信公众号模版消息 Created by fancyLou on 3/11/21. Copyright © 2021 O2. All rights
* reserved.
*/
public class MpWeixinConsumeQueue extends AbstractQueue<Message> {
public class MpweixinConsumeQueue extends AbstractQueue<Message> {
private static final Logger LOGGER = LoggerFactory.getLogger(MpWeixinConsumeQueue.class);
private static final Logger LOGGER = LoggerFactory.getLogger(MpweixinConsumeQueue.class);
private static final Gson gson = XGsonBuilder.instance();
......
......@@ -12,7 +12,7 @@ import com.x.base.core.project.queue.AbstractQueue;
import com.x.message.assemble.communicate.message.PmsInnerMessage;
import com.x.message.core.entity.Message;
public class PmsInnerConsumeQueue extends AbstractQueue<Message> {
public class PmsinnerConsumeQueue extends AbstractQueue<Message> {
protected void execute(Message message) throws Exception {
Application app = ThisApplication.context().applications()
......
......@@ -44,7 +44,7 @@ public class RestfulConsumeQueue extends AbstractQueue<Message> {
private static WebservicesClient client = new WebservicesClient();
protected void execute(Message message) throws Exception {
if (null != message && StringUtils.isNotEmpty(message.getItem())) {
if (null != message) {
update(message);
}
List<String> ids = listOverStay();
......@@ -54,9 +54,7 @@ public class RestfulConsumeQueue extends AbstractQueue<Message> {
Optional<Message> optional = find(id);
if (optional.isPresent()) {
message = optional.get();
if (StringUtils.isNotEmpty(message.getItem())) {
update(message);
}
update(message);
}
}
}
......
......@@ -12,7 +12,6 @@ import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import com.google.gson.Gson;
......@@ -39,7 +38,7 @@ public class TableConsumeQueue extends AbstractQueue<Message> {
private static final Gson gson = XGsonBuilder.instance();
protected void execute(Message message) throws Exception {
if (null != message && StringUtils.isNotEmpty(message.getItem())) {
if (null != message) {
update(message);
}
List<String> ids = listOverStay();
......@@ -49,9 +48,7 @@ public class TableConsumeQueue extends AbstractQueue<Message> {
Optional<Message> optional = find(id);
if (optional.isPresent()) {
message = optional.get();
if (StringUtils.isNotEmpty(message.getItem())) {
update(message);
}
update(message);
}
}
}
......
......@@ -32,17 +32,17 @@ public class ThisApplication {
public static final QiyeweixinConsumeQueue qiyeweixinConsumeQueue = new QiyeweixinConsumeQueue();
public static final ZhengwuDingdingConsumeQueue zhengwuDingdingConsumeQueue = new ZhengwuDingdingConsumeQueue();
public static final ZhengwudingdingConsumeQueue zhengwudingdingConsumeQueue = new ZhengwudingdingConsumeQueue();
public static final DingdingConsumeQueue dingdingConsumeQueue = new DingdingConsumeQueue();
public static final WeLinkConsumeQueue weLinkConsumeQueue = new WeLinkConsumeQueue();
public static final WelinkConsumeQueue welinkConsumeQueue = new WelinkConsumeQueue();
public static final PmsInnerConsumeQueue pmsInnerConsumeQueue = new PmsInnerConsumeQueue();
public static final PmsinnerConsumeQueue pmsinnerConsumeQueue = new PmsinnerConsumeQueue();
public static final MpWeixinConsumeQueue mpWeixinConsumeQueue = new MpWeixinConsumeQueue();
public static final MpweixinConsumeQueue mpweixinConsumeQueue = new MpweixinConsumeQueue();
public static final ActiveMqConsumeQueue activeMqConsumeQueue = new ActiveMqConsumeQueue();
public static final ActivemqConsumeQueue activemqConsumeQueue = new ActivemqConsumeQueue();
public static final KafkaConsumeQueue kafkaConsumeQueue = new KafkaConsumeQueue();
......@@ -87,7 +87,7 @@ public class ThisApplication {
private static void startQueue() throws Exception {
context().startQueue(kafkaConsumeQueue);
context().startQueue(activeMqConsumeQueue);
context().startQueue(activemqConsumeQueue);
context().startQueue(restfulConsumeQueue);
context().startQueue(apiConsumeQueue);
context().startQueue(mailConsumeQueue);
......@@ -100,20 +100,20 @@ public class ThisApplication {
}
if (BooleanUtils.isTrue(Config.zhengwuDingding().getEnable())
&& BooleanUtils.isTrue(Config.zhengwuDingding().getMessageEnable())) {
context().startQueue(zhengwuDingdingConsumeQueue);
context().startQueue(zhengwudingdingConsumeQueue);
}
if (Config.dingding().getEnable() && Config.dingding().getMessageEnable()) {
context().startQueue(dingdingConsumeQueue);
}
if (BooleanUtils.isTrue(Config.pushConfig().getEnable())) {
context().startQueue(pmsInnerConsumeQueue);
context().startQueue(pmsinnerConsumeQueue);
}
if (Config.weLink().getEnable() && Config.weLink().getMessageEnable()) {
context().startQueue(weLinkConsumeQueue);
context().startQueue(welinkConsumeQueue);
}
if (BooleanUtils.isTrue(Config.mPweixin().getEnable())
&& BooleanUtils.isTrue(Config.mPweixin().getMessageEnable())) {
context().startQueue(mpWeixinConsumeQueue);
context().startQueue(mpweixinConsumeQueue);
}
if (BooleanUtils.isTrue(Config.communicate().wsEnable())) {
context().startQueue(wsConsumeQueue);
......
......@@ -24,9 +24,9 @@ import com.x.message.assemble.communicate.message.WeLinkMessage;
import com.x.message.core.entity.Message;
import com.x.organization.core.entity.Person;
public class WeLinkConsumeQueue extends AbstractQueue<Message> {
public class WelinkConsumeQueue extends AbstractQueue<Message> {
private static final Logger LOGGER = LoggerFactory.getLogger(WeLinkConsumeQueue.class);
private static final Logger LOGGER = LoggerFactory.getLogger(WelinkConsumeQueue.class);
protected void execute(Message message) throws Exception {
......
......@@ -10,9 +10,9 @@ import com.x.base.core.project.queue.AbstractQueue;
import com.x.message.assemble.communicate.message.ZhengwuDingdingMessage;
import com.x.message.core.entity.Message;
public class ZhengwuDingdingConsumeQueue extends AbstractQueue<Message> {
public class ZhengwudingdingConsumeQueue extends AbstractQueue<Message> {
private static Logger logger = LoggerFactory.getLogger(ZhengwuDingdingConsumeQueue.class);
private static Logger logger = LoggerFactory.getLogger(ZhengwudingdingConsumeQueue.class);
protected void execute(Message message) throws Exception {
......
......@@ -149,7 +149,8 @@ class ActionCreate extends BaseAction {
@Deprecated
private Message assembleMessage(String consumer, Wi cpWi, Instant instant) throws Exception {
Message message = null;
switch (Objects.toString(consumer, "")) {
String type = StringUtils.lowerCase(Objects.toString(consumer, ""));
switch (type) {
case MessageConnector.CONSUME_WS:
message = this.wsMessage(cpWi, instant);
break;
......@@ -184,7 +185,7 @@ class ActionCreate extends BaseAction {
@Deprecated
private void sendMessage(List<Message> messages) throws Exception {
for (Message message : messages) {
switch (message.getConsumer()) {
switch (StringUtils.lowerCase(message.getConsumer())) {
case MessageConnector.CONSUME_WS:
sendMessageWs(message);
break;
......@@ -219,14 +220,14 @@ class ActionCreate extends BaseAction {
private void sendMessageMPWeixin(Message message) throws Exception {
if (BooleanUtils.isTrue(Config.mPweixin().getEnable())
&& BooleanUtils.isTrue(Config.mPweixin().getMessageEnable())) {
ThisApplication.mpWeixinConsumeQueue.send(message);
ThisApplication.mpweixinConsumeQueue.send(message);
}
}
@Deprecated
private void sendMessagePmsInner(Message message) throws Exception {
if (BooleanUtils.isTrue(Config.pushConfig().getEnable())) {
ThisApplication.pmsInnerConsumeQueue.send(message);
ThisApplication.pmsinnerConsumeQueue.send(message);
}
}
......@@ -243,7 +244,7 @@ class ActionCreate extends BaseAction {
private void sendMessageZhengwuDingding(Message message) throws Exception {
if (BooleanUtils.isTrue(Config.zhengwuDingding().getEnable())
&& BooleanUtils.isTrue(Config.zhengwuDingding().getMessageEnable())) {
ThisApplication.zhengwuDingdingConsumeQueue.send(message);
ThisApplication.zhengwudingdingConsumeQueue.send(message);
}
}
......@@ -251,7 +252,7 @@ class ActionCreate extends BaseAction {
private void sendMessageWeLink(Message message) throws Exception {
if (BooleanUtils.isTrue(Config.weLink().getEnable())
&& BooleanUtils.isTrue(Config.weLink().getMessageEnable())) {
ThisApplication.weLinkConsumeQueue.send(message);
ThisApplication.welinkConsumeQueue.send(message);
}
}
......@@ -485,7 +486,7 @@ class ActionCreate extends BaseAction {
private Message v3AssembleMessage(Wi wi, Consumer consumer, Instant instant) {
Message message = null;
String type = Objects.toString(consumer.getType(), "");
String type = StringUtils.lowerCase(Objects.toString(consumer.getType(), ""));
switch (type) {
case MessageConnector.CONSUME_WS:
message = v3WsMessage(wi, consumer);
......@@ -565,7 +566,7 @@ class ActionCreate extends BaseAction {
}
message.setType(wi.getType());
message.setConsumed(false);
message.setConsumer(wi.getType());
message.setConsumer(consumer.getType());
message.getProperties().setConsumerJsonElement(gson.toJsonTree(consumer));
return message;
}
......@@ -756,7 +757,8 @@ class ActionCreate extends BaseAction {
private void v3SendMessage(List<Message> messages) throws Exception {
for (Message message : messages) {
switch (message.getConsumer()) {
String type = StringUtils.lowerCase(message.getConsumer());
switch (type) {
case MessageConnector.CONSUME_WS:
ThisApplication.wsConsumeQueue.send(message);
break;
......@@ -767,25 +769,25 @@ class ActionCreate extends BaseAction {
ThisApplication.dingdingConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_WELINK:
ThisApplication.weLinkConsumeQueue.send(message);
ThisApplication.welinkConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_ZHENGWUDINGDING:
ThisApplication.zhengwuDingdingConsumeQueue.send(message);
ThisApplication.zhengwudingdingConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_QIYEWEIXIN:
ThisApplication.qiyeweixinConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_PMS_INNER:
ThisApplication.pmsInnerConsumeQueue.send(message);
ThisApplication.pmsinnerConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_MPWEIXIN:
ThisApplication.mpWeixinConsumeQueue.send(message);
ThisApplication.mpweixinConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_KAFKA:
ThisApplication.kafkaConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_ACTIVEMQ:
ThisApplication.activeMqConsumeQueue.send(message);
ThisApplication.activemqConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_RESTFUL:
ThisApplication.restfulConsumeQueue.send(message);
......
......@@ -81,7 +81,7 @@ public class ActionMsgCreate extends BaseAction {
message.setType(MessageConnector.TYPE_IM_CREATE);
message.setId("");
if (BooleanUtils.isTrue(Config.pushConfig().getEnable())) {
ThisApplication.pmsInnerConsumeQueue.send(message);
ThisApplication.pmsinnerConsumeQueue.send(message);
}
}
} catch (Exception e) {
......
......@@ -16,7 +16,7 @@ public class TriggerMessageConsumeQueue extends AbstractJob {
LOGGER.debug(
"schedule trigger kafkaConsumeQueue, activeMqConsumeQueue, restfulConsumeQueue, mailConsumeQueue, apiConsumeQueue, jdbcConsumeQueue, tableConsumeQueue, hadoopConsumeQueue.");
ThisApplication.kafkaConsumeQueue.send(null);
ThisApplication.activeMqConsumeQueue.send(null);
ThisApplication.activemqConsumeQueue.send(null);
ThisApplication.restfulConsumeQueue.send(null);
ThisApplication.mailConsumeQueue.send(null);
ThisApplication.apiConsumeQueue.send(null);
......
......@@ -47,8 +47,6 @@ public class Message extends SliceJpaObject {
@Column(length = length_id, name = ColumnNamePrefix + id_FIELDNAME)
private String id = createId();
/* 以上为 JpaObject 默认字段 */
public void onPersist() throws Exception {
// nothing
}
......@@ -113,13 +111,6 @@ public class Message extends SliceJpaObject {
@CheckPersist(allowEmpty = false)
private String instant;
public static final String ITEM_FIELDNAME = "item";
@FieldDescribe("配置项.")
@Column(length = JpaObject.length_255B, name = ColumnNamePrefix + ITEM_FIELDNAME)
@Index(name = TABLE + IndexNameMiddle + ITEM_FIELDNAME)
@CheckPersist(allowEmpty = true)
private String item;
public static final String PROPERTIES_FIELDNAME = "properties";
@FieldDescribe("属性对象存储字段.")
@Persistent
......@@ -180,14 +171,6 @@ public class Message extends SliceJpaObject {
return instant;
}
public String getItem() {
return item;
}
public void setItem(String item) {
this.item = item;
}
public void setInstant(String instant) {
this.instant = instant;
}
......
......@@ -32,6 +32,8 @@ public class Event extends SliceJpaObject {
public static final String EVENTTYPE_UPDATETABLE = "updateTable";
public static final String EVENTTYPE_ARCHIVEHADOOP = "archiveHadoop";
public String getId() {
return id;
}
......
package com.x.processplatform.service.processing;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.google.gson.Gson;
import com.x.base.core.container.EntityManagerContainer;
import com.x.base.core.container.factory.EntityManagerContainerFactory;
import com.x.base.core.entity.JpaObject_;
import com.x.base.core.entity.dataitem.DataItem;
import com.x.base.core.entity.dataitem.DataItemConverter;
import com.x.base.core.entity.dataitem.ItemCategory;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.config.ProcessPlatform.ArchiveHadoop;
import com.x.base.core.project.config.StorageMapping;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.base.core.project.queue.AbstractQueue;
import com.x.base.core.project.tools.StringTools;
import com.x.processplatform.core.entity.content.Attachment;
import com.x.processplatform.core.entity.content.ReadCompleted;
import com.x.processplatform.core.entity.content.Record;
import com.x.processplatform.core.entity.content.Review;
import com.x.processplatform.core.entity.content.TaskCompleted;
import com.x.processplatform.core.entity.content.WorkCompleted;
import com.x.processplatform.core.entity.content.WorkLog;
import com.x.processplatform.core.entity.message.Event;
import com.x.processplatform.core.entity.message.Event_;
import com.x.query.core.entity.Item;
public class ArchiveHadoopQueue extends AbstractQueue<String> {
private static final Logger LOGGER = LoggerFactory.getLogger(ArchiveHadoopQueue.class);
private static final String ATTRIBUTE_FS_DEFAULTFS = "fs.defaultFS";
private static final String SYSTEM_PROPERTY_HADOOP_USER_NAME = "HADOOP_USER_NAME";
private static final int RETRYMINUTES = 60;
private static final int THRESHOLDMINUTES = 60 * 24 * 3;
private final Gson gson = XGsonBuilder.compactInstance();
private final DataItemConverter<Item> converter = new DataItemConverter<>(Item.class);
protected void execute(String id) throws Exception {
if (StringUtils.isNotEmpty(id)) {
archive(id);
}
List<String> ids = this.checkOverstay();
if (!ids.isEmpty()) {
for (String s : ids) {
archive(s);
}
clean();
}
}
private boolean archive(String id) throws Exception {
Event event = exist(id);
if ((null != event) && StringUtils.equals(event.getType(), Event.EVENTTYPE_ARCHIVEHADOOP)) {
if (transfer(event)) {
success(id);
} else {
failure(id);
LOGGER.warn("归档到Hadoop失败:{}.", () -> id);
}
}
return false;
}
private boolean transfer(Event event) throws Exception {
WorkCompleted workCompleted = null;
List<Item> itemList = null;
List<TaskCompleted> taskCompletedList = null;
List<ReadCompleted> readCompletedList = null;
List<Review> reviewList = null;
List<WorkLog> workLogList = null;
List<Record> recordList = null;
List<Attachment> attachmentList = null;
try {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
workCompleted = emc.firstEqual(WorkCompleted.class, WorkCompleted.job_FIELDNAME, event.getJob());
Business business = new Business(emc);
if (null != workCompleted) {
itemList = business.entityManagerContainer().listEqualAndEqual(Item.class,
DataItem.bundle_FIELDNAME, workCompleted.getJob(), DataItem.itemCategory_FIELDNAME,
ItemCategory.pp);
taskCompletedList = this.listTaskCompleted(business, workCompleted.getJob());
readCompletedList = this.listReadCompleted(business, workCompleted.getJob());
reviewList = this.listReview(business, workCompleted.getJob());
workLogList = this.listWorkLog(business, workCompleted.getJob());
recordList = this.listRecord(business, workCompleted.getJob());
attachmentList = this.listAttachment(business, workCompleted.getJob());
}
}
if (null != workCompleted) {
ArchiveHadoop archiveHadoop = Config.processPlatform().getArchiveHadoop();
org.apache.hadoop.fs.Path dir = dir(archiveHadoop, workCompleted);
try (FileSystem fileSystem = FileSystem.get(configuration(archiveHadoop))) {
if (fileSystem.exists(dir)) {
fileSystem.delete(dir, true);
}
transferWorkCompleted(fileSystem, dir, workCompleted);
transferData(fileSystem, dir, itemList);
transferTaskCompleteds(fileSystem, dir, taskCompletedList);
transferReadCompleteds(fileSystem, dir, readCompletedList);
transferReviews(fileSystem, dir, reviewList);
transferRecords(fileSystem, dir, recordList);
transferWorkLogs(fileSystem, dir, workLogList);
transferAttachments(fileSystem, dir, attachmentList);
}
}
return true;
} catch (Exception e) {
LOGGER.error(e);
}
return false;
}
private void transferWorkCompleted(FileSystem fileSystem, Path dir, WorkCompleted workCompleted)
throws IOException {
Path path = new Path(dir, "workCompleted.json");
try (FSDataOutputStream out = fileSystem.create(path);
OutputStreamWriter outputStreamWriter = new OutputStreamWriter(out, StandardCharsets.UTF_8);
BufferedWriter bufferedOutputStreamWriter = new BufferedWriter(outputStreamWriter)) {
bufferedOutputStreamWriter.write(this.gson.toJson(workCompleted));
}
}
private void transferData(FileSystem fileSystem, Path dir, List<Item> itemList) throws IOException {
Path path = new Path(dir, "data.json");
try (FSDataOutputStream out = fileSystem.create(path);
OutputStreamWriter outputStreamWriter = new OutputStreamWriter(out, StandardCharsets.UTF_8);
BufferedWriter bufferedOutputStreamWriter = new BufferedWriter(outputStreamWriter)) {
bufferedOutputStreamWriter.write(this.gson.toJson(converter.assemble(itemList)));
}
}
private void transferTaskCompleteds(FileSystem fileSystem, Path dir, List<TaskCompleted> taskCompletedList)
throws IOException {
Path path = new Path(dir, "taskCompleteds.json");
try (FSDataOutputStream out = fileSystem.create(path);
OutputStreamWriter outputStreamWriter = new OutputStreamWriter(out, StandardCharsets.UTF_8);
BufferedWriter bufferedOutputStreamWriter = new BufferedWriter(outputStreamWriter)) {
for (int i = 0; i < taskCompletedList.size(); i++) {
if (i > 0) {
bufferedOutputStreamWriter.write(StringTools.LF);
}
bufferedOutputStreamWriter.write(this.gson.toJson(taskCompletedList.get(i)));
}
}
}
private void transferReadCompleteds(FileSystem fileSystem, Path dir, List<ReadCompleted> readCompletedList)
throws IOException {
Path path = new Path(dir, "readCompleteds.json");
try (FSDataOutputStream out = fileSystem.create(path);
OutputStreamWriter outputStreamWriter = new OutputStreamWriter(out, StandardCharsets.UTF_8);
BufferedWriter bufferedOutputStreamWriter = new BufferedWriter(outputStreamWriter)) {
for (int i = 0; i < readCompletedList.size(); i++) {
if (i > 0) {
bufferedOutputStreamWriter.write(StringTools.LF);
}
bufferedOutputStreamWriter.write(this.gson.toJson(readCompletedList.get(i)));
}
}
}
private void transferReviews(FileSystem fileSystem, Path dir, List<Review> reviewList) throws IOException {
Path path = new Path(dir, "reviews.json");
try (FSDataOutputStream out = fileSystem.create(path);
OutputStreamWriter outputStreamWriter = new OutputStreamWriter(out, StandardCharsets.UTF_8);
BufferedWriter bufferedOutputStreamWriter = new BufferedWriter(outputStreamWriter)) {
for (int i = 0; i < reviewList.size(); i++) {
if (i > 0) {
bufferedOutputStreamWriter.write(StringTools.LF);
}
bufferedOutputStreamWriter.write(this.gson.toJson(reviewList.get(i)));
}
}
}
private void transferRecords(FileSystem fileSystem, Path dir, List<Record> recordList) throws IOException {
Path path = new Path(dir, "records.json");
try (FSDataOutputStream out = fileSystem.create(path);
OutputStreamWriter outputStreamWriter = new OutputStreamWriter(out, StandardCharsets.UTF_8);
BufferedWriter bufferedOutputStreamWriter = new BufferedWriter(outputStreamWriter)) {
for (int i = 0; i < recordList.size(); i++) {
if (i > 0) {
bufferedOutputStreamWriter.write(StringTools.LF);
}
bufferedOutputStreamWriter.write(this.gson.toJson(recordList.get(i)));
}
}
}
private void transferWorkLogs(FileSystem fileSystem, Path dir, List<WorkLog> workLogList) throws IOException {
Path path = new Path(dir, "workLogs.json");
try (FSDataOutputStream out = fileSystem.create(path);
OutputStreamWriter outputStreamWriter = new OutputStreamWriter(out, StandardCharsets.UTF_8);
BufferedWriter bufferedOutputStreamWriter = new BufferedWriter(outputStreamWriter)) {
for (int i = 0; i < workLogList.size(); i++) {
if (i > 0) {
bufferedOutputStreamWriter.write(StringTools.LF);
}
bufferedOutputStreamWriter.write(this.gson.toJson(workLogList.get(i)));
}
}
}
private void transferAttachments(FileSystem fileSystem, Path dir, List<Attachment> attachmentList)
throws Exception {
Path path = new Path(dir, "attachments.json");
try (FSDataOutputStream out = fileSystem.create(path);
OutputStreamWriter outputStreamWriter = new OutputStreamWriter(out, StandardCharsets.UTF_8);
BufferedWriter bufferedOutputStreamWriter = new BufferedWriter(outputStreamWriter)) {
for (int i = 0; i < attachmentList.size(); i++) {
if (i > 0) {
bufferedOutputStreamWriter.write(StringTools.LF);
}
bufferedOutputStreamWriter.write(this.gson.toJson(attachmentList.get(i)));
}
}
for (Attachment attachment : attachmentList) {
Path attachmentPath = new Path(dir, "attachment_" + attachment.getId());
try (FSDataOutputStream out = fileSystem.create(attachmentPath)) {
StorageMapping mapping = ThisApplication.context().storageMappings().get(Attachment.class,
attachment.getStorage());
if (null != mapping) {
byte[] bytes = attachment.readContent(mapping);
IOUtils.write(bytes, out);
}
}
}
}
private org.apache.hadoop.conf.Configuration configuration(ArchiveHadoop archiveHadoop) {
if (StringUtils.isNotEmpty(archiveHadoop.getUsername())) {
System.setProperty(SYSTEM_PROPERTY_HADOOP_USER_NAME, archiveHadoop.getUsername());
}
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
configuration.set(ATTRIBUTE_FS_DEFAULTFS, archiveHadoop.getFsDefaultFS());
return configuration;
}
private org.apache.hadoop.fs.Path dir(ArchiveHadoop archiveHadoop, WorkCompleted workCompleted) {
org.apache.hadoop.fs.Path path;
if (StringUtils.isEmpty(archiveHadoop.getPath())) {
path = new org.apache.hadoop.fs.Path(Path.SEPARATOR);
} else if (StringUtils.startsWith(archiveHadoop.getPath(), Path.SEPARATOR)) {
path = new org.apache.hadoop.fs.Path(archiveHadoop.getPath());
} else {
path = new org.apache.hadoop.fs.Path(Path.SEPARATOR + archiveHadoop.getPath());
}
path = new org.apache.hadoop.fs.Path(path, new org.apache.hadoop.fs.Path(workCompleted.getApplication()));
path = new org.apache.hadoop.fs.Path(path, new org.apache.hadoop.fs.Path(workCompleted.getProcess()));
String id = workCompleted.getId();
path = new org.apache.hadoop.fs.Path(path, new org.apache.hadoop.fs.Path(id.substring(0, 2)));
path = new org.apache.hadoop.fs.Path(path, new org.apache.hadoop.fs.Path(id.substring(2, 4)));
path = new org.apache.hadoop.fs.Path(path, new org.apache.hadoop.fs.Path(id));
return path;
}
private List<TaskCompleted> listTaskCompleted(Business business, String job) throws Exception {
return business.entityManagerContainer().listEqual(TaskCompleted.class, TaskCompleted.job_FIELDNAME, job);
}
private List<ReadCompleted> listReadCompleted(Business business, String job) throws Exception {
return business.entityManagerContainer().listEqual(ReadCompleted.class, ReadCompleted.job_FIELDNAME, job);
}
private List<Review> listReview(Business business, String job) throws Exception {
return business.entityManagerContainer().listEqual(Review.class, Review.job_FIELDNAME, job);
}
private List<Record> listRecord(Business business, String job) throws Exception {
return business.entityManagerContainer().listEqual(Record.class, Review.job_FIELDNAME, job);
}
private List<WorkLog> listWorkLog(Business business, String job) throws Exception {
return business.entityManagerContainer().listEqual(WorkLog.class, Review.job_FIELDNAME, job);
}
private List<Attachment> listAttachment(Business business, String job) throws Exception {
return business.entityManagerContainer().listEqual(Attachment.class, ReadCompleted.job_FIELDNAME, job);
}
private Event exist(String id) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
return emc.find(id, Event.class);
} catch (Exception e) {
LOGGER.error(e);
}
return null;
}
private void success(String id) throws Exception {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
Event event = emc.find(id, Event.class);
if (null != event) {
emc.beginTransaction(Event.class);
emc.remove(event);
emc.commit();
}
} catch (Exception e) {
LOGGER.error(e);
}
}
private void failure(String id) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
Event event = emc.find(id, Event.class);
if (null != event) {
emc.beginTransaction(Event.class);
Integer failure = event.getFailure();
failure = (failure == null) ? 1 : failure + 1;
event.setFailure(failure);
emc.commit();
}
} catch (Exception e) {
LOGGER.error(e);
}
}
private List<String> checkOverstay() throws Exception {
List<String> list = new ArrayList<>();
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
EntityManager em = emc.get(Event.class);
CriteriaBuilder cb = em.getCriteriaBuilder();
CriteriaQuery<String> cq = cb.createQuery(String.class);
Root<Event> root = cq.from(Event.class);
Predicate p = cb.equal(root.get(Event_.type), Event.EVENTTYPE_ARCHIVEHADOOP);
p = cb.and(p, cb.lessThanOrEqualTo(root.get(JpaObject_.updateTime),
DateUtils.addMinutes(new Date(), -RETRYMINUTES)));
list.addAll(em.createQuery(cq.select(root.get(Event_.id)).where(p)).setMaxResults(100).getResultList());
}
if (!list.isEmpty()) {
LOGGER.info("查找到 {} 条处理失败的归档到hadoop事件.", list::size);
}
return list;
}
private void clean() throws Exception {
List<String> list = new ArrayList<>();
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
EntityManager em = emc.get(Event.class);
CriteriaBuilder cb = em.getCriteriaBuilder();
CriteriaQuery<String> cq = cb.createQuery(String.class);
Root<Event> root = cq.from(Event.class);
Predicate p = cb.equal(root.get(Event_.type), Event.EVENTTYPE_ARCHIVEHADOOP);
p = cb.and(p, cb.lessThanOrEqualTo(root.get(JpaObject_.createTime),
DateUtils.addDays(new Date(), -THRESHOLDMINUTES)));
list.addAll(em.createQuery(cq.select(root.get(Event_.id)).where(p)).setMaxResults(100).getResultList());
if (!list.isEmpty()) {
emc.beginTransaction(Event.class);
for (String id : list) {
Event event = emc.find(id, Event.class);
if (null != event) {
emc.remove(event);
}
}
emc.commit();
}
}
if (!list.isEmpty()) {
LOGGER.info("删除 {} 条超期的归档到hadoop事件.", list::size);
}
}
}
\ No newline at end of file
......@@ -52,6 +52,8 @@ public class ThisApplication {
public static final SyncJaxwsInvokeQueue syncJaxwsInvokeQueue = new SyncJaxwsInvokeQueue();
public static final UpdateTableQueue updateTableQueue = new UpdateTableQueue();
public static final ArchiveHadoopQueue archiveHadoopQueue = new ArchiveHadoopQueue();
private static ProcessingToProcessingSignalStack processingToProcessingSignalStack = new ProcessingToProcessingSignalStack();
......@@ -71,6 +73,7 @@ public class ThisApplication {
context().startQueue(syncJaxrsInvokeQueue);
context().startQueue(syncJaxwsInvokeQueue);
context().startQueue(updateTableQueue);
context().startQueue(archiveHadoopQueue);
if (BooleanUtils.isTrue(Config.processPlatform().getMerge().getEnable())) {
context.schedule(Merge.class, Config.processPlatform().getMerge().getCron());
}
......@@ -98,6 +101,9 @@ public class ThisApplication {
if (BooleanUtils.isTrue(Config.processPlatform().getUpdateTable().getEnable())) {
context.schedule(UpdateTable.class, Config.processPlatform().getUpdateTable().getCron());
}
if (BooleanUtils.isTrue(Config.processPlatform().getArchiveHadoop().getEnable())) {
context.schedule(UpdateTable.class, Config.processPlatform().getArchiveHadoop().getCron());
}
} catch (Exception e) {
e.printStackTrace();
}
......
......@@ -17,9 +17,9 @@ import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.x.base.core.container.EntityManagerContainer;
import com.x.base.core.container.factory.EntityManagerContainerFactory;
import com.x.base.core.entity.JpaObject_;
import com.x.base.core.project.Applications;
import com.x.base.core.project.x_query_service_processing;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.jaxrs.WrapBoolean;
import com.x.base.core.project.logger.Logger;
......@@ -36,6 +36,9 @@ public class UpdateTableQueue extends AbstractQueue<String> {
private Gson gson = XGsonBuilder.instance();
private static final int RETRYMINUTES = 20;
private static final int THRESHOLDMINUTES = 60 * 24 * 3;
protected void execute(String id) throws Exception {
if (StringUtils.isNotEmpty(id)) {
update(id);
......@@ -73,7 +76,7 @@ public class UpdateTableQueue extends AbstractQueue<String> {
data = new WorkDataHelper(emc, workCompleted).get();
}
}
if ((null != data) && (null != workCompleted)) {
if (null != data) {
JsonElement jsonElement = XGsonBuilder.merge(gson.toJsonTree(workCompleted), gson.toJsonTree(data));
WrapBoolean resp = ThisApplication.context().applications().postQuery(x_query_service_processing.class,
Applications.joinQueryUri("table", event.getTarget(), "update", event.getJob()), jsonElement)
......@@ -131,12 +134,12 @@ public class UpdateTableQueue extends AbstractQueue<String> {
CriteriaQuery<String> cq = cb.createQuery(String.class);
Root<Event> root = cq.from(Event.class);
Predicate p = cb.equal(root.get(Event_.type), Event.EVENTTYPE_UPDATETABLE);
p = cb.and(p, cb.lessThanOrEqualTo(root.get(Event_.updateTime),
DateUtils.addMinutes(new Date(), -Config.processPlatform().getUpdateTable().getRetryMinutes())));
p = cb.and(p, cb.lessThanOrEqualTo(root.get(JpaObject_.updateTime),
DateUtils.addMinutes(new Date(), -RETRYMINUTES)));
list.addAll(em.createQuery(cq.select(root.get(Event_.id)).where(p)).setMaxResults(100).getResultList());
}
if (!list.isEmpty()) {
LOGGER.info("查找到 {} 条处理失败的同步到自建表事件.", () -> list.size());
LOGGER.info("查找到 {} 条处理失败的同步到自建表事件.", list::size);
}
return list;
}
......@@ -149,8 +152,8 @@ public class UpdateTableQueue extends AbstractQueue<String> {
CriteriaQuery<String> cq = cb.createQuery(String.class);
Root<Event> root = cq.from(Event.class);
Predicate p = cb.equal(root.get(Event_.type), Event.EVENTTYPE_UPDATETABLE);
p = cb.and(p, cb.lessThanOrEqualTo(root.get(Event_.createTime), DateUtils.addMinutes(new Date(),
-Config.processPlatform().getUpdateTable().getThresholdMinutes())));
p = cb.and(p, cb.lessThanOrEqualTo(root.get(JpaObject_.createTime),
DateUtils.addMinutes(new Date(), -THRESHOLDMINUTES)));
list.addAll(em.createQuery(cq.select(root.get(Event_.id)).where(p)).setMaxResults(100).getResultList());
if (!list.isEmpty()) {
emc.beginTransaction(Event.class);
......@@ -164,7 +167,7 @@ public class UpdateTableQueue extends AbstractQueue<String> {
}
}
if (!list.isEmpty()) {
LOGGER.info("删除 {} 条超期的同步到自建表事件.", () -> list.size());
LOGGER.info("删除 {} 条超期的同步到自建表事件.", list::size);
}
}
}
\ No newline at end of file
......@@ -166,30 +166,55 @@ public class EndProcessor extends AbstractEndProcessor {
// 回写到父Work
tryUpdateParentWork(aeiObjects);
addUpdateTableEvent(aeiObjects);
addArchiveHadoopEvent(aeiObjects);
}
private void addUpdateTableEvent(AeiObjects aeiObjects) throws Exception {
if (BooleanUtils.isTrue(aeiObjects.getProcess().getUpdateTableEnable())
&& ListTools.isNotEmpty(aeiObjects.getProcess().getUpdateTableList())) {
List<Event> events = new ArrayList<>();
for (String table : aeiObjects.getProcess().getUpdateTableList()) {
if (StringUtils.isNotEmpty(table)) {
Event event = new Event();
event.setTarget(table);
event.setJob(aeiObjects.getWork().getJob());
event.setType(Event.EVENTTYPE_UPDATETABLE);
events.add(event);
}
}
if (!events.isEmpty()) {
private void addArchiveHadoopEvent(AeiObjects aeiObjects) {
try {
if (BooleanUtils.isTrue(Config.processPlatform().getArchiveHadoop().getEnable())) {
aeiObjects.entityManagerContainer().beginTransaction(Event.class);
for (Event event : events) {
aeiObjects.entityManagerContainer().persist(event, CheckPersistType.all);
}
Event event = new Event();
event.setJob(aeiObjects.getWork().getJob());
event.setType(Event.EVENTTYPE_ARCHIVEHADOOP);
aeiObjects.entityManagerContainer().persist(event, CheckPersistType.all);
aeiObjects.entityManagerContainer().commit();
for (Event event : events) {
ThisApplication.updateTableQueue.send(event.getId());
ThisApplication.updateTableQueue.send(event.getId());
}
} catch (Exception e) {
LOGGER.error(e);
}
}
private void addUpdateTableEvent(AeiObjects aeiObjects) {
try {
if (BooleanUtils.isTrue(aeiObjects.getProcess().getUpdateTableEnable())
&& ListTools.isNotEmpty(aeiObjects.getProcess().getUpdateTableList())) {
List<Event> events = new ArrayList<>();
for (String table : aeiObjects.getProcess().getUpdateTableList()) {
if (StringUtils.isNotEmpty(table)) {
Event event = new Event();
event.setTarget(table);
event.setJob(aeiObjects.getWork().getJob());
event.setType(Event.EVENTTYPE_UPDATETABLE);
events.add(event);
}
}
sendUpdateTableEvent(aeiObjects, events);
}
} catch (Exception e) {
LOGGER.error(e);
}
}
private void sendUpdateTableEvent(AeiObjects aeiObjects, List<Event> events) throws Exception {
if (!events.isEmpty()) {
aeiObjects.entityManagerContainer().beginTransaction(Event.class);
for (Event event : events) {
aeiObjects.entityManagerContainer().persist(event, CheckPersistType.all);
}
aeiObjects.entityManagerContainer().commit();
for (Event event : events) {
ThisApplication.updateTableQueue.send(event.getId());
}
}
}
......
package com.x.processplatform.service.processing.schedule;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.base.core.project.schedule.AbstractJob;
import com.x.processplatform.service.processing.ThisApplication;
public class ArchiveHadoop extends AbstractJob {
private static final Logger LOGGER = LoggerFactory.getLogger(ArchiveHadoop.class);
@Override
public void schedule(JobExecutionContext jobExecutionContext) throws Exception {
LOGGER.debug("send archiveHadoopQueue signal.");
try {
ThisApplication.archiveHadoopQueue.send("");
} catch (Exception e) {
throw new JobExecutionException(e);
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册