提交 d9d25f4d 编写于 作者: R Ray

add mail api queue

上级 fcca2319
[
{
"url":"jdbc:mysql://127.0.0.1:3306/X?autoReconnect=true&useSSL=false&useUnicode=true&characterEncoding=UTF-8&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8",
"url":"jdbc:mysql://127.0.0.1:3306/X?autoReconnect=true&allowPublicKeyRetrieval=true&useSSL=false&useUnicode=true&characterEncoding=UTF-8&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8",
"username" : "root",
"password" :"password",
"includes": [],
......
......@@ -373,6 +373,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.sun.mail</groupId>
<artifactId>javax.mail</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
......@@ -841,7 +845,6 @@
<artifactId>neuroph-contrib</artifactId>
<version>2.98</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
......@@ -1072,6 +1075,11 @@
<artifactId>hadoop-client</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>com.sun.mail</groupId>
<artifactId>javax.mail</artifactId>
<version>1.6.2</version>
</dependency>
</dependencies>
</dependencyManagement>
<repositories>
......@@ -1093,4 +1101,4 @@
<url>http://maven.o2oa.net/repository/o2oa-release/</url>
</repository>
</distributionManagement>
</project>
</project>
\ No newline at end of file
......@@ -34,9 +34,12 @@ import com.x.base.core.project.config.JpushConfig;
import com.x.base.core.project.config.LogLevel;
import com.x.base.core.project.config.MPweixin;
import com.x.base.core.project.config.Meeting;
import com.x.base.core.project.config.MessageApi;
import com.x.base.core.project.config.MessageMail;
import com.x.base.core.project.config.MessageMq;
import com.x.base.core.project.config.MessageRestful;
import com.x.base.core.project.config.Messages;
import com.x.base.core.project.config.Mock;
import com.x.base.core.project.config.Mq;
import com.x.base.core.project.config.Node;
import com.x.base.core.project.config.Organization;
import com.x.base.core.project.config.Person;
......@@ -84,9 +87,12 @@ public class CreateConfigSample {
classes.add(LogLevel.class);
classes.add(Meeting.class);
classes.add(Messages.class);
classes.add(MessageRestful.class);
classes.add(MessageMail.class);
classes.add(MessageMq.class);
classes.add(MessageApi.class);
classes.add(Mock.class);
classes.add(MPweixin.class);
classes.add(Mq.class);
classes.add(Node.class);
classes.add(Organization.class);
classes.add(Person.class);
......
......@@ -54,23 +54,24 @@ public class Communicate extends ConfigObject {
return BooleanUtils.isTrue(this.updateQueryTableEnable);
}
@FieldDescribe("定时触发发送到消息队列MQ.")
private CronMq cronMq;
@FieldDescribe("定时触发消息消费队列.")
private TriggerMessageConsumeQueue triggerMessageConsumeQueue;
public CronMq cronMq() {
return this.cronMq == null ? new CronMq() : this.cronMq;
public TriggerMessageConsumeQueue triggerMessageConsumeQueue() {
return this.triggerMessageConsumeQueue == null ? new TriggerMessageConsumeQueue()
: this.triggerMessageConsumeQueue;
}
public static class CronMq extends ConfigObject {
public static class TriggerMessageConsumeQueue extends ConfigObject {
private static final long serialVersionUID = 1559477154694423422L;
public static CronMq defaultInstance() {
return new CronMq();
public static TriggerMessageConsumeQueue defaultInstance() {
return new TriggerMessageConsumeQueue();
}
public static final Boolean DEFAULT_ENABLE = false;
public static final String DEFAULT_CRON = "0 0 * * * ?"; // 每小时运行一次
public static final Boolean DEFAULT_ENABLE = true;
public static final String DEFAULT_CRON = "20 20 * * * ?"; // 每小时运行一次
@FieldDescribe("是否启用")
private Boolean enable = DEFAULT_ENABLE;
......
......@@ -74,6 +74,10 @@ public class Config {
public static final String PATH_CONFIG_DUMPRESTORESTORAGE = "config/dumpRestoreStorage.json";
public static final String PATH_CONFIG_MESSAGES = "config/messages.json";
public static final String PATH_CONFIG_MESSAGES_SEND_RULE = "config/messageSendRule.js";
public static final String PATH_CONFIG_MESSAGERESTFUL = "config/messageRestful.json";
public static final String PATH_CONFIG_MESSAGEMQ = "config/messageMq.json";
public static final String PATH_CONFIG_MESSAGEMAIL = "config/messageMail.json";
public static final String PATH_CONFIG_MESSAGEAPI = "config/messageApi.json";
public static final String PATH_CONFIG_SSLKEYSTORE = "config/keystore";
public static final String PATH_CONFIG_SSLKEYSTORESAMPLE = "config/sample/keystore";
public static final String PATH_CONFIG_STARTIMAGE = "config/startImage.png";
......@@ -86,7 +90,6 @@ public class Config {
public static final String PATH_CONFIG_ZHENGWUDINGDING = "config/zhengwuDingding.json";
public static final String PATH_CONFIG_QIYEWEIXIN = "config/qiyeweixin.json";
public static final String PATH_CONFIG_MPWEIXIN = "config/mpweixin.json";
public static final String PATH_CONFIG_MQ = "config/mq.json";
public static final String PATH_CONFIG_LOGLEVEL = "config/logLevel.json";
public static final String PATH_CONFIG_BINDLOGO = "config/bindLogo.png";
public static final String PATH_CONFIG_SLICE = "config/slice.json";
......@@ -128,11 +131,6 @@ public class Config {
public static final String DIR_DYNAMIC = "dynamic";
public static final String DIR_DYNAMIC_JARS = "dynamic/jars";
public static final String DIR_JVM = "jvm";
// public static final String DIR_JVM_AIX = "jvm/aix";
// public static final String DIR_JVM_LINUX = "jvm/linux";
// public static final String DIR_JVM_MACOS = "jvm/macos";
// public static final String DIR_JVM_WINDOWS = "jvm/windows";
// public static final String DIR_JVM_NEOKYLIN_LOONGSON = "jvm/neokylin_loongson";
public static final String DIR_LOCAL = "local";
public static final String DIR_LOCAL_BACKUP = "local/backup";
public static final String DIR_LOCAL_UPDATE = "local/update";
......@@ -239,10 +237,6 @@ public class Config {
return Paths.get(base()).resolve(DIR_COMMONS_FONTS);
}
// public static File dir_commons_ext() throws Exception {
// return new File(base(), DIR_COMMONS_EXT);
// }
public static Path dir_commons_ext() throws IOException, URISyntaxException {
if (SystemUtils.IS_JAVA_11) {
return Paths.get(base()).resolve(DIR_COMMONS_EXT + "_java11");
......@@ -350,7 +344,7 @@ public class Config {
return new File(base(), DIR_LOCAL_TEMP);
}
public static File dir_local_temp_classes() throws IOException, URISyntaxException {
public static File dir_local_temp_classes() throws IOException, URISyntaxException {
return new File(base(), DIR_LOCAL_TEMP_CLASSES);
}
......@@ -933,6 +927,58 @@ public class Config {
return instance().messages;
}
private MessageRestful messageRestful;
public static synchronized MessageRestful messageRestful() throws Exception {
if (null == instance().messageRestful) {
MessageRestful obj = BaseTools.readConfigObject(PATH_CONFIG_MESSAGERESTFUL, MessageRestful.class);
if (null == obj) {
obj = MessageRestful.defaultInstance();
}
instance().messageRestful = obj;
}
return instance().messageRestful;
}
private MessageMq messageMq;
public static synchronized MessageMq messageMq() throws Exception {
if (null == instance().messageMq) {
MessageMq obj = BaseTools.readConfigObject(PATH_CONFIG_MESSAGEMQ, MessageMq.class);
if (null == obj) {
obj = MessageMq.defaultInstance();
}
instance().messageMq = obj;
}
return instance().messageMq;
}
private MessageMail messageMail;
public static synchronized MessageMail messageMail() throws Exception {
if (null == instance().messageMail) {
MessageMail obj = BaseTools.readConfigObject(PATH_CONFIG_MESSAGEMAIL, MessageMail.class);
if (null == obj) {
obj = MessageMail.defaultInstance();
}
instance().messageMail = obj;
}
return instance().messageMail;
}
private MessageApi messageApi;
public static synchronized MessageApi messageApi() throws Exception {
if (null == instance().messageApi) {
MessageApi obj = BaseTools.readConfigObject(PATH_CONFIG_MESSAGEAPI, MessageApi.class);
if (null == obj) {
obj = MessageApi.defaultInstance();
}
instance().messageApi = obj;
}
return instance().messageApi;
}
private String messageSendRuleScript;
public static synchronized String messageSendRuleScript() throws Exception {
......@@ -1051,19 +1097,6 @@ public class Config {
return instance().zhengwuDingding;
}
private Mq mq;
public static synchronized Mq mq() throws Exception {
if (null == instance().mq) {
Mq obj = BaseTools.readConfigObject(PATH_CONFIG_MQ, Mq.class);
if (null == obj) {
obj = Mq.defaultInstance();
}
instance().mq = obj;
}
return instance().mq;
}
private Vfs vfs;
public static synchronized Vfs vfs() throws Exception {
......
package com.x.base.core.project.config;
import com.x.base.core.project.annotation.FieldDescribe;
public class MQActive extends ConfigObject {
@FieldDescribe("服务器地址")
private String url;
@FieldDescribe("消息队列名")
private String queueName;
@FieldDescribe("密钥文件存储路径")
private String keyStore;
@FieldDescribe("证书文件存储路径")
private String trustStore;
@FieldDescribe("密钥密码")
private String keyStorePassword;
public static MQActive defaultInstance() {
return new MQActive();
}
public static final String default_url = "tcp://127.0.0.1:61616";
public static final String default_queueName = "queue-test";
public MQActive() {
this.url = default_url;
this.queueName = default_queueName;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getKeyStore() {
return keyStore;
}
public void setKeyStore(String keyStore) {
this.keyStore = keyStore;
}
public String getTrustStore() {
return trustStore;
}
public void setTrustStore(String trustStore) {
this.trustStore = trustStore;
}
public String getKeyStorePassword() {
return keyStorePassword;
}
public void setKeyStorePassword(String keyStorePassword) {
this.keyStorePassword = keyStorePassword;
}
}
package com.x.base.core.project.config;
import org.apache.commons.lang3.StringUtils;
import com.x.base.core.project.annotation.FieldDescribe;
public class MQKafka extends ConfigObject {
@FieldDescribe("服务器地址")
private String bootstrap_servers;
@FieldDescribe("主题")
private String topic;
@FieldDescribe("指定必须有多少个分区副本接收消息,生产者才认为消息写入成功")
private String acks;
@FieldDescribe("错误的次数")
private Integer retries;
@FieldDescribe("批次可以使用的内存大小")
private Integer batch_size;
@FieldDescribe("等待更多消息加入批次的时间")
private Integer linger_ms;
@FieldDescribe("生产者内存缓冲区的大小")
private Integer buffer_memory;
@FieldDescribe("key值的序列化类")
private String key_deserializer;
@FieldDescribe("value的序列化类")
private String value_deserializer;
public static MQKafka defaultInstance() {
return new MQKafka();
}
public static final String default_bootstrap_servers = "localhost:9092";
public static final String default_topic = "topic-test";
public static final String default_acks = "all";
public static final Integer default_retries = 0;
public static final Integer default_batch_size = 16384;
public static final Integer default_linger_ms= 1;
public static final Integer default_buffer_memory = 33554432;
public static final String default_key_deserializer = "org.apache.kafka.common.serialization.StringDeserializer";
public static final String default_value_deserializer = "org.apache.kafka.common.serialization.StringDeserializer";
public MQKafka() {
this.bootstrap_servers = default_bootstrap_servers;
this.topic = default_topic;
this.acks = default_acks;
this.retries = default_retries;
this.batch_size = default_batch_size;
this.linger_ms = default_linger_ms;
this.buffer_memory= default_buffer_memory;
this.key_deserializer = default_key_deserializer;
this.value_deserializer = default_value_deserializer;
}
public String getBootstrap_servers() {
return StringUtils.isEmpty(bootstrap_servers) ? default_bootstrap_servers : this.bootstrap_servers;
}
public void setBootstrap_servers(String bootstrap_servers) {
this.bootstrap_servers = bootstrap_servers;
}
public String getTopic() {
return StringUtils.isEmpty(topic) ? default_topic : this.topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getAcks() {
return StringUtils.isEmpty(acks) ? default_acks : this.acks;
}
public void setAcks(String acks) {
this.acks = acks;
}
public Integer getRetries() {
return this.retries;
}
public void setRetries(Integer retries) {
this.retries = retries;
}
public Integer getBatch_size() {
return this.batch_size;
}
public void setBatch_size(Integer batch_size) {
this.batch_size = batch_size;
}
public Integer getLinger_ms() {
return this.linger_ms;
}
public void setLinger_ms(Integer linger_ms) {
this.linger_ms = linger_ms;
}
public Integer getBuffer_memory() {
return this.buffer_memory;
}
public void setBuffer_memory(Integer buffer_memory) {
this.buffer_memory = buffer_memory;
}
public String getKey_deserializer() {
return StringUtils.isEmpty(key_deserializer) ? default_key_deserializer : this.key_deserializer ;
}
public void setKey_deserializer(String key_deserializer) {
this.key_deserializer = key_deserializer;
}
public String getValue_deserializer() {
return StringUtils.isEmpty(value_deserializer) ? default_value_deserializer : this.value_deserializer ;
}
public void setValue_deserializer(String value_deserializer) {
this.value_deserializer = value_deserializer;
}
}
......@@ -50,16 +50,12 @@ public class Message extends GsonPropertyObject {
private Map<String, String> consumersV2 = new HashMap<>();
private Map<String, Consumer> consumersV3 = new HashMap<>();
private List<Consumer> consumersV3 = new ArrayList<>();
public Map<String, Consumer> getConsumersV3() {
public List<Consumer> getConsumersV3() {
return consumersV3;
}
public void setConsumersV3(Map<String, Consumer> consumersV3) {
this.consumersV3 = consumersV3;
}
public List<String> getConsumers() {
return consumers;
}
......@@ -76,22 +72,28 @@ public class Message extends GsonPropertyObject {
this.consumersV2 = consumersV2;
}
public boolean v3Enable() {
return (null != this.consumersV3) && (!this.consumersV3.isEmpty());
}
public static class Consumer implements Serializable {
private static final long serialVersionUID = 392932139617988800L;
@FieldDescribe("消费者名称")
private String type;
@FieldDescribe("是否启用")
private Boolean enable;
@FieldDescribe("装载器")
private String loader;
@FieldDescribe("地址")
private String url;
@FieldDescribe("方法")
private String method;
@FieldDescribe("过滤器")
private String filter;
@FieldDescribe("配置条目")
private String item;
public String getFilter() {
return filter;
}
public void setFilter(String filter) {
this.filter = filter;
}
public String getType() {
return type;
......@@ -117,20 +119,12 @@ public class Message extends GsonPropertyObject {
this.loader = loader;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getMethod() {
return method;
public String getItem() {
return item;
}
public void setMethod(String method) {
this.method = method;
public void setItem(String item) {
this.item = item;
}
}
......
package com.x.base.core.project.config;
import java.util.LinkedHashMap;
import com.x.base.core.project.annotation.FieldDescribe;
public class MessageApi extends LinkedHashMap<String, MessageApi.Item> {
private static final long serialVersionUID = 2536141863287117519L;
public static MessageApi defaultInstance() {
return new MessageApi();
}
public static class Item {
@FieldDescribe("应用")
private String application;
@FieldDescribe("路径")
private String path;
@FieldDescribe("方法")
private String method;
public String getApplication() {
return application;
}
public void setApplication(String application) {
this.application = application;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
}
}
package com.x.base.core.project.config;
import java.util.LinkedHashMap;
import com.x.base.core.project.annotation.FieldDescribe;
public class MessageMail extends LinkedHashMap<String, MessageMail.Item> {
private static final long serialVersionUID = 2536141863287117519L;
public static MessageMail defaultInstance() {
return new MessageMail();
}
public static class Item {
public Item() {
host = DEFAULT_HOST;
port = DEFAULT_PORT;
sslEnable = DEFAULT_SSLENABLE;
auth = DEFAULT_AUTH;
from = DEFAULT_FROM;
password = DEFAULT_PASSWORD;
}
public static final String DEFAULT_HOST = "";
public static final Integer DEFAULT_PORT = 465;
public static final Boolean DEFAULT_SSLENABLE = true;
public static final Boolean DEFAULT_AUTH = true;
public static final String DEFAULT_FROM = "admin@o2oa.net";
public static final String DEFAULT_PASSWORD = "password";
@FieldDescribe("smtp主机.")
private String host;
@FieldDescribe("smtp端口.")
private Integer port;
@FieldDescribe("smtp 使用ssl加密.")
private Boolean sslEnable;
@FieldDescribe("stmp启用认证.")
private Boolean auth;
@FieldDescribe("发件人.")
private String from;
@FieldDescribe("发件人密码.")
private String password;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
public Boolean getSslEnable() {
return sslEnable;
}
public void setSslEnable(Boolean sslEnable) {
this.sslEnable = sslEnable;
}
public Boolean getAuth() {
return auth;
}
public void setAuth(Boolean auth) {
this.auth = auth;
}
public String getFrom() {
return from;
}
public void setFrom(String from) {
this.from = from;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
}
package com.x.base.core.project.config;
import java.util.LinkedHashMap;
import com.x.base.core.project.annotation.FieldDescribe;
public class MessageMq extends LinkedHashMap<String, MessageMq.Item> {
private static final long serialVersionUID = 2536141863287117519L;
public static MessageMq defaultInstance() {
return new MessageMq();
}
public static class Item {
public static final String TYPE_KAFKA = "kafka";
public static final String TYPE_ACTIVEMQ = "activeMQ";
@FieldDescribe("类型,kafka或者activeMQ")
private String type;
@FieldDescribe("服务器地址")
private String kafkaBootstrapServers;
@FieldDescribe("主题")
private String kafkaTopic;
@FieldDescribe("指定必须有多少个分区副本接收消息,生产者才认为消息写入成功")
private String kafkaAcks;
@FieldDescribe("错误的次数")
private Integer kafkaRetries;
@FieldDescribe("批次可以使用的内存大小")
private Integer kafkaBatchSize;
@FieldDescribe("等待更多消息加入批次的时间")
private Integer kafkaLingerMs;
@FieldDescribe("生产者内存缓冲区的大小")
private Integer kafkaBufferMemory;
@FieldDescribe("key值的序列化类")
private String kafkaKeyDeserializer;
@FieldDescribe("value的序列化类")
private String kafkaValueDeserializer;
@FieldDescribe("用户名")
private String activeMQUsername;
@FieldDescribe("密码")
private String activeMQPassword;
@FieldDescribe("服务器地址")
private String activeMQUrl;
@FieldDescribe("消息队列名")
private String activeMQQueueName;
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getActiveMQUsername() {
return activeMQUsername;
}
public void setActiveMQUsername(String activeMQUsername) {
this.activeMQUsername = activeMQUsername;
}
public String getActiveMQPassword() {
return activeMQPassword;
}
public void setActiveMQPassword(String activeMQPassword) {
this.activeMQPassword = activeMQPassword;
}
public String getKafkaBootstrapServers() {
return kafkaBootstrapServers;
}
public void setKafkaBootstrapServers(String kafkaBootstrapServers) {
this.kafkaBootstrapServers = kafkaBootstrapServers;
}
public String getKafkaTopic() {
return kafkaTopic;
}
public void setKafkaTopic(String kafkaTopic) {
this.kafkaTopic = kafkaTopic;
}
public String getKafkaAcks() {
return kafkaAcks;
}
public void setKafkaAcks(String kafkaAcks) {
this.kafkaAcks = kafkaAcks;
}
public Integer getKafkaRetries() {
return kafkaRetries;
}
public void setKafkaRetries(Integer kafkaRetries) {
this.kafkaRetries = kafkaRetries;
}
public Integer getKafkaBatchSize() {
return kafkaBatchSize;
}
public void setKafkaBatchSize(Integer kafkaBatchSize) {
this.kafkaBatchSize = kafkaBatchSize;
}
public Integer getKafkaLingerMs() {
return kafkaLingerMs;
}
public void setKafkaLingerMs(Integer kafkaLingerMs) {
this.kafkaLingerMs = kafkaLingerMs;
}
public Integer getKafkaBufferMemory() {
return kafkaBufferMemory;
}
public void setKafkaBufferMemory(Integer kafkaBufferMemory) {
this.kafkaBufferMemory = kafkaBufferMemory;
}
public String getKafkaKeyDeserializer() {
return kafkaKeyDeserializer;
}
public void setKafkaKeyDeserializer(String kafkaKeyDeserializer) {
this.kafkaKeyDeserializer = kafkaKeyDeserializer;
}
public String getKafkaValueDeserializer() {
return kafkaValueDeserializer;
}
public void setKafkaValueDeserializer(String kafkaValueDeserializer) {
this.kafkaValueDeserializer = kafkaValueDeserializer;
}
public String getActiveMQUrl() {
return activeMQUrl;
}
public void setActiveMQUrl(String activeMQUrl) {
this.activeMQUrl = activeMQUrl;
}
public String getActiveMQQueueName() {
return activeMQQueueName;
}
public void setActiveMQQueueName(String activeMQQueueName) {
this.activeMQQueueName = activeMQQueueName;
}
}
}
package com.x.base.core.project.config;
import java.util.LinkedHashMap;
import com.x.base.core.project.annotation.FieldDescribe;
public class MessageRestful extends LinkedHashMap<String, MessageRestful.Item> {
private static final long serialVersionUID = 2536141863287117519L;
public static MessageRestful defaultInstance() {
return new MessageRestful();
}
public static class Item {
@FieldDescribe("地址")
private String url;
@FieldDescribe("方法")
private String method;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
}
}
......@@ -6,18 +6,30 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import com.x.base.core.project.annotation.FieldDescribe;
import com.x.base.core.project.config.Message.Consumer;
import com.x.base.core.project.message.MessageConnector;
import org.apache.commons.lang3.BooleanUtils;
public class Messages extends ConcurrentSkipListMap<String, Message> {
private static final long serialVersionUID = 1336172131736006743L;
public static final Boolean DEFAULT_WEBSOCKETENABLE = true;
public static final Boolean DEFAULT_V3ENABLE = false;
public Messages() {
super();
}
@FieldDescribe("是否启用V3配置")
private Boolean v3Enable;
public Boolean v3Enable() {
return BooleanUtils.isTrue(this.v3Enable);
}
public static Messages defaultInstance() {
Messages o = new Messages();
// 示例
......@@ -114,6 +126,7 @@ public class Messages extends ConcurrentSkipListMap<String, Message> {
new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS));
// im聊天消息发送
o.put(MessageConnector.TYPE_IM_CREATE, new Message(MessageConnector.CONSUME_WS));
o.v3Enable = DEFAULT_V3ENABLE;
return o;
}
......@@ -138,4 +151,13 @@ public class Messages extends ConcurrentSkipListMap<String, Message> {
return map;
}
public List<Consumer> getConsumersV3(String type) {
Message o = this.get(type);
List<Consumer> list = new ArrayList<>();
if ((null != o) && (null != o.getConsumersV3())) {
list.addAll(o.getConsumersV3());
}
return list;
}
}
package com.x.base.core.project.config;
import org.apache.commons.lang3.BooleanUtils;
import com.x.base.core.project.annotation.FieldDescribe;
public class Mq extends ConfigObject {
@FieldDescribe("是否启用.")
private Boolean enable;
@FieldDescribe("消息服务类型")
private String mq;
@FieldDescribe("Kafka服务器配置")
private MQKafka kafka;
@FieldDescribe("ActiveMQ服务器配置")
private MQActive activeMQ;
public static Mq defaultInstance() {
return new Mq();
}
public static final Boolean default_enable = false;
public static final String default_mq = "kafka";
public Mq() {
this.enable = default_enable;
this.mq = default_mq;
}
public Boolean getEnable() {
return BooleanUtils.isTrue(this.enable);
}
public void setEnable(Boolean enable) {
this.enable = enable;
}
public String getMq() {
return mq;
}
public void setMq(String mq) {
this.mq = mq;
}
public MQKafka getKafka() {
return kafka;
}
public void setKafka(MQKafka kafka) {
this.kafka = kafka;
}
public MQActive getActiveMQ() {
return activeMQ;
}
public void setActiveMQ(MQActive activeMQ) {
this.activeMQ = activeMQ;
}
}
......@@ -12,6 +12,10 @@ import com.x.base.core.project.gson.XGsonBuilder;
public class MessageConnector {
private MessageConnector() {
// nothing
}
private static Gson gson = XGsonBuilder.instance();
public static final String TYPE_APPLICATION_CREATE = "application_create";
......@@ -144,8 +148,10 @@ public class MessageConnector {
public static final String CONSUME_MQ = "mq";
// restful类型
public static final String CONSUME_RESTFUL = "restful";
// 邮件类型
public static final String CONSUME_MAIL = "mail";
// 内部调用
public static final String CONSUME_INVOKE = "invoke";
public static final String CONSUME_API = "api";
private static Context context;
......@@ -176,10 +182,6 @@ public class MessageConnector {
connectQueue.put(wrap);
}
public static void updateTable(Wrap warp) {
}
public static class ConnectorThread extends Thread {
@Override
public void run() {
......@@ -272,6 +274,8 @@ public class MessageConnector {
public static class StopSignal extends Wrap {
private static final long serialVersionUID = -5631247237688117035L;
}
}
......@@ -125,9 +125,6 @@ public class WebservicesClient {
private List<NameValuePair> jaxrsHeads(Map<String, String> heads) {
List<NameValuePair> list = new ArrayList<>();
// if (null == heads || (!heads.containsKey(ConnectionAction.CONTENT_TYPE))) {
// list.add(new NameValuePair(ConnectionAction.CONTENT_TYPE, ConnectionAction.CONTENT_TYPE_VALUE));
// }
if (null != heads) {
heads.entrySet().forEach(en -> list.add(new NameValuePair(en.getKey(), en.getValue())));
}
......
package com.x.message.assemble.communicate;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.persistence.EntityManager;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.x.base.core.container.EntityManagerContainer;
import com.x.base.core.container.factory.EntityManagerContainerFactory;
import com.x.base.core.entity.JpaObject_;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.config.MessageApi;
import com.x.base.core.project.connection.ConnectionAction;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.base.core.project.message.MessageConnector;
import com.x.base.core.project.queue.AbstractQueue;
import com.x.message.core.entity.Message;
import com.x.message.core.entity.Message_;
public class ApiConsumeQueue extends AbstractQueue<Message> {
private static final Logger LOGGER = LoggerFactory.getLogger(ApiConsumeQueue.class);
private static final Pattern pattern = Pattern.compile("\\{\\$(.+?)\\}");
private static Gson gson = XGsonBuilder.instance();
protected void execute(Message message) throws Exception {
LOGGER.debug("execute:{}.", message::getTitle);
if (null != message && StringUtils.isNotEmpty(message.getItem())) {
update(message);
}
for (String id : listOverStay()) {
Optional<Message> optional = find(id);
if (optional.isPresent()) {
message = optional.get();
if (StringUtils.isNotEmpty(message.getItem())) {
update(message);
}
}
}
}
private Optional<Message> find(String id) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
return Optional.of(emc.find(id, Message.class));
} catch (Exception e) {
LOGGER.error(e);
}
return Optional.empty();
}
private void update(Message message) {
try {
MessageApi.Item item = Config.messageApi().get(message.getItem());
if (null != item) {
String path = path(message, item);
if (StringUtils.equalsIgnoreCase(item.getMethod(), ConnectionAction.METHOD_GET)) {
ThisApplication.context().applications().getQuery(item.getApplication(), path).getData();
ThisApplication.context().applications().getQuery(item.getApplication(), path).getData();
} else if (StringUtils.equalsIgnoreCase(item.getMethod(), ConnectionAction.METHOD_POST)) {
ThisApplication.context().applications().postQuery(item.getApplication(), path, message.getBody())
.getData();
} else if (StringUtils.equalsIgnoreCase(item.getMethod(), ConnectionAction.METHOD_DELETE)) {
ThisApplication.context().applications().deleteQuery(item.getApplication(), path).getData();
} else if (StringUtils.equalsIgnoreCase(item.getMethod(), ConnectionAction.METHOD_PUT)) {
ThisApplication.context().applications().putQuery(item.getApplication(), path, message.getBody())
.getData();
}
success(message.getId());
} else {
throw new ExceptionMessageRestfulItem(message.getItem());
}
} catch (Exception e) {
failure(message.getId(), e);
LOGGER.error(e);
}
}
private String path(Message message, MessageApi.Item item) {
String path = item.getPath();
JsonElement jsonElement = gson.toJsonTree(message.getBody());
if (jsonElement.isJsonObject()) {
JsonObject jsonObject = jsonElement.getAsJsonObject();
if (null != jsonObject) {
Matcher matcher = pattern.matcher(path);
while (matcher.find()) {
String key = matcher.group(1);
if (jsonObject.has(key)) {
String value = jsonObject.get(key).getAsString();
if (null != value) {
path = StringUtils.replace(path, matcher.group(), value);
}
}
}
}
}
return path;
}
private void success(String id) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
Message message = emc.find(id, Message.class);
if (null != message) {
emc.beginTransaction(Message.class);
message.setConsumed(true);
emc.commit();
}
} catch (Exception e) {
LOGGER.error(e);
}
}
private void failure(String id, Exception exception) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
Message message = emc.find(id, Message.class);
if (null != message) {
emc.beginTransaction(Message.class);
Integer failure = message.getProperties().getFailure();
failure = (null == failure) ? 1 : failure + 1;
message.getProperties().setFailure(failure);
message.getProperties().setError(exception.getMessage());
emc.commit();
}
} catch (Exception e) {
LOGGER.error(e);
}
}
private List<String> listOverStay() {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
EntityManager em = emc.get(Message.class);
CriteriaBuilder cb = em.getCriteriaBuilder();
CriteriaQuery<String> cq = cb.createQuery(String.class);
Root<Message> root = cq.from(Message.class);
Predicate p = cb.equal(root.get(Message_.consumer), MessageConnector.CONSUME_RESTFUL);
p = cb.and(p, cb.notEqual(root.get(Message_.consumed), true));
p = cb.and(p, cb.lessThan(root.get(JpaObject_.updateTime), DateUtils.addMinutes(new Date(), -20)));
cq.select(root.get(Message_.id)).where(p);
return em.createQuery(cq).setMaxResults(20).getResultList();
} catch (Exception e) {
LOGGER.error(e);
}
return new ArrayList<>();
}
}
package com.x.message.assemble.communicate;
import com.x.base.core.project.exception.PromptException;
class ExceptionEmptyRecipient extends PromptException {
private static final long serialVersionUID = 5966961923060058124L;
ExceptionEmptyRecipient(String person) {
super("can't find {} mail address.", person);
}
}
package com.x.message.assemble.communicate;
import com.x.base.core.project.exception.PromptException;
class ExceptionMessageMailItem extends PromptException {
private static final long serialVersionUID = 5966961923060058124L;
ExceptionMessageMailItem(String item) {
super("can't find item:{} in message mail items.", item);
}
}
package com.x.message.assemble.communicate;
import com.x.base.core.project.exception.PromptException;
class ExceptionMessageMqItem extends PromptException {
private static final long serialVersionUID = 5966961923060058124L;
ExceptionMessageMqItem(String item) {
super("can't find item:{} in message mq items.", item);
}
}
package com.x.message.assemble.communicate;
import com.x.base.core.project.exception.PromptException;
class ExceptionMessageRestfulItem extends PromptException {
private static final long serialVersionUID = 5966961923060058124L;
ExceptionMessageRestfulItem(String item) {
super("can't find item:{} in message restful items.", item);
}
}
......@@ -24,238 +24,242 @@ import com.x.message.core.entity.Message;
import com.x.organization.core.entity.Person;
/**
* 发送微信公众号模版消息
* Created by fancyLou on 3/11/21.
* Copyright © 2021 O2. All rights reserved.
* 发送微信公众号模版消息 Created by fancyLou on 3/11/21. Copyright © 2021 O2. All rights
* reserved.
*/
public class MPWeixinConsumeQueue extends AbstractQueue<Message> {
private static Logger logger = LoggerFactory.getLogger(MPWeixinConsumeQueue.class);
//
// creatorPerson 创建人 activityName 当前节点 processName 流程名称 startTime 开始时间 title 标题
@Override
protected void execute(Message message) throws Exception {
String tempId = Config.mPweixin().getTempMessageId();
List<MPweixinMessageTemp> list = Config.mPweixin().getFieldList();
if (Config.mPweixin().getEnable() && Config.mPweixin().getMessageEnable() && StringUtils.isNotEmpty(tempId) && (list != null && !list.isEmpty())) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
Business business = new Business(emc);
// openid 查询用户
Person person = business.message().getPersonWithCredential(message.getPerson());
if (person != null) {
String openId = person.getMpwxopenId();
logger.info("openId : "+openId);
if (StringUtils.isNotEmpty(openId)) {
JsonObject object =new JsonParser().parse(message.getBody()).getAsJsonObject();
Map<String, WeixinTempMessageFieldObj> data = new HashMap<>();
WeixinTempMessageFieldObj wobj = new WeixinTempMessageFieldObj();
wobj.setValue(message.getTitle());
data.put("first", wobj);
for (int i = 0; i < list.size(); i++) {
MPweixinMessageTemp filed = list.get(i);
String name = filed.getName();
String tempName = filed.getTempName();
String value = object.get(name).getAsString();
logger.info("解析出的结果 name:"+name + " value:"+value + " 放入tempName:"+tempName);
if ("title".equalsIgnoreCase(name)) { // 工作标题为空就用消息的标题
if (StringUtils.isEmpty(value)) {
value = "无标题";
}
} else {
if ("creatorPerson".equalsIgnoreCase(name) && StringUtils.isNotEmpty(value)) {
value = value.split("@")[0]; //截取姓名
}
if (StringUtils.isEmpty(value)) {
value = "unknown";
}
}
WeixinTempMessageFieldObj obj = new WeixinTempMessageFieldObj();
obj.setValue(value);
obj.setColor("#173177");
data.put(tempName, obj);
}
WeixinTempMessageFieldObj robj = new WeixinTempMessageFieldObj();
robj.setValue("请注意查收!");
data.put("remark", robj);
String workId = object.get("work").getAsString();
String workUrl = getOpenUrl(workId);
WeixinTempMessage wxMessage = new WeixinTempMessage();
wxMessage.setTouser(openId);
wxMessage.setUrl(workUrl);
wxMessage.setTemplate_id(tempId);
wxMessage.setTopcolor("#fb4747");
wxMessage.setData(data);
logger.info("发送的消息对象:" + wxMessage.toString());
String url = MPweixin.default_apiAddress + "/cgi-bin/message/template/send?access_token="+ Config.mPweixin().accessToken();
WeixinResponse response = HttpConnection.postAsObject(url, null, wxMessage.toString(), WeixinResponse.class);
logger.info("返回:"+response);
if (response.getErrcode() != 0) {
ExceptionQiyeweixinMessage e = new ExceptionQiyeweixinMessage(response.getErrcode(), response.getErrmsg());
logger.error(e);
}else {
Message messageEntityObject = emc.find(message.getId(), Message.class);
if (null != messageEntityObject) {
emc.beginTransaction(Message.class);
messageEntityObject.setConsumed(true);
emc.commit();
}
}
}else {
logger.info("没有绑定微信公众号 :"+ message.getPerson());
}
}else {
logger.info("没有找到用户!");
}
}
}else {
logger.info("配置文件配置条件不足!!");
}
}
private String getOpenUrl(String workId) {
try {
String httpProtocol = Config.currentNode().getCenter().getHttpProtocol();
if (StringUtils.isEmpty(httpProtocol)) {
logger.error(new Exception("没有获取到http访问协议"));
return null;
}
String host = Config.currentNode().getWeb().getProxyHost();
if (StringUtils.isEmpty(host)) {
logger.error(new Exception("没有获取到代理地址"));
return null;
}
String workUrl = "workmobilewithaction.html?workid=" + workId;
String portalId = Config.mPweixin().getPortalId();
String portal = "portalmobile.html?id="+portalId;
portal = URLEncoder.encode(portal, DefaultCharset.name);
workUrl += "&redirectlink=" + portal;
workUrl = URLEncoder.encode(workUrl, DefaultCharset.name);
String o2oaUrl = httpProtocol +"://"+ host + "/x_desktop/";
o2oaUrl = o2oaUrl+"mpweixinsso.html?redirect="+workUrl+"&type=login";
logger.info("o2oa 地址:"+o2oaUrl);
o2oaUrl = URLEncoder.encode(o2oaUrl, DefaultCharset.name);
logger.info("encode url :"+o2oaUrl);
String appId = Config.mPweixin().getAppid();
String url = "https://open.weixin.qq.com/connect/oauth2/authorize?appid="+appId
+"&redirect_uri=" + o2oaUrl
+"&response_type=code&scope=snsapi_userinfo&state=STATE#wechat_redirect";
logger.info("final url :" +url);
return url;
}catch (Exception e) {
logger.error(e);
return null;
}
}
public static class WeixinResponse {
private Integer errcode;
private String errmsg;
private Long msgid;
public Integer getErrcode() {
return errcode;
}
public void setErrcode(Integer errcode) {
this.errcode = errcode;
}
public String getErrmsg() {
return errmsg;
}
public void setErrmsg(String errmsg) {
this.errmsg = errmsg;
}
public Long getMsgid() {
return msgid;
}
public void setMsgid(Long msgid) {
this.msgid = msgid;
}
}
/**
* 微信发送模版消息的对象
*/
public static class WeixinTempMessage extends GsonPropertyObject {
private String touser;
private String template_id;
private String url;
private String topcolor;
private Map<String, WeixinTempMessageFieldObj> data; // 模版字段数据
public String getTouser() {
return touser;
}
public void setTouser(String touser) {
this.touser = touser;
}
public String getTemplate_id() {
return template_id;
}
public void setTemplate_id(String template_id) {
this.template_id = template_id;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getTopcolor() {
return topcolor;
}
public void setTopcolor(String topcolor) {
this.topcolor = topcolor;
}
public Map<String, WeixinTempMessageFieldObj> getData() {
return data;
}
public void setData(Map<String, WeixinTempMessageFieldObj> data) {
this.data = data;
}
}
/**
* 模版字段对象
*/
public static class WeixinTempMessageFieldObj extends GsonPropertyObject {
private String value;
private String color;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public String getColor() {
return color;
}
public void setColor(String color) {
this.color = color;
}
}
private static Logger logger = LoggerFactory.getLogger(MPWeixinConsumeQueue.class);
//
// creatorPerson 创建人 activityName 当前节点 processName 流程名称 startTime 开始时间 title 标题
@Override
protected void execute(Message message) throws Exception {
String tempId = Config.mPweixin().getTempMessageId();
List<MPweixinMessageTemp> list = Config.mPweixin().getFieldList();
if (Config.mPweixin().getEnable() && Config.mPweixin().getMessageEnable() && StringUtils.isNotEmpty(tempId)
&& (list != null && !list.isEmpty())) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
Business business = new Business(emc);
// openid 查询用户
Person person = business.message().getPersonWithCredential(message.getPerson());
if (person != null) {
String openId = person.getMpwxopenId();
logger.info("openId : " + openId);
if (StringUtils.isNotEmpty(openId)) {
JsonObject object = new JsonParser().parse(message.getBody()).getAsJsonObject();
Map<String, WeixinTempMessageFieldObj> data = new HashMap<>();
WeixinTempMessageFieldObj wobj = new WeixinTempMessageFieldObj();
wobj.setValue(message.getTitle());
data.put("first", wobj);
for (int i = 0; i < list.size(); i++) {
MPweixinMessageTemp filed = list.get(i);
String name = filed.getName();
String tempName = filed.getTempName();
String value = object.get(name).getAsString();
logger.info("解析出的结果 name:" + name + " value:" + value + " 放入tempName:" + tempName);
if ("title".equalsIgnoreCase(name)) { // 工作标题为空就用消息的标题
if (StringUtils.isEmpty(value)) {
value = "无标题";
}
} else {
if ("creatorPerson".equalsIgnoreCase(name) && StringUtils.isNotEmpty(value)) {
value = value.split("@")[0]; // 截取姓名
}
if (StringUtils.isEmpty(value)) {
value = "unknown";
}
}
WeixinTempMessageFieldObj obj = new WeixinTempMessageFieldObj();
obj.setValue(value);
obj.setColor("#173177");
data.put(tempName, obj);
}
WeixinTempMessageFieldObj robj = new WeixinTempMessageFieldObj();
robj.setValue("请注意查收!");
data.put("remark", robj);
String workId = object.get("work").getAsString();
String workUrl = getOpenUrl(workId);
WeixinTempMessage wxMessage = new WeixinTempMessage();
wxMessage.setTouser(openId);
wxMessage.setUrl(workUrl);
wxMessage.setTemplate_id(tempId);
wxMessage.setTopcolor("#fb4747");
wxMessage.setData(data);
logger.info("发送的消息对象:" + wxMessage.toString());
String url = MPweixin.default_apiAddress + "/cgi-bin/message/template/send?access_token="
+ Config.mPweixin().accessToken();
WeixinResponse response = HttpConnection.postAsObject(url, null, wxMessage.toString(),
WeixinResponse.class);
logger.info("返回:" + response);
if (response.getErrcode() != 0) {
ExceptionQiyeweixinMessage e = new ExceptionQiyeweixinMessage(response.getErrcode(),
response.getErrmsg());
logger.error(e);
} else {
Message messageEntityObject = emc.find(message.getId(), Message.class);
if (null != messageEntityObject) {
emc.beginTransaction(Message.class);
messageEntityObject.setConsumed(true);
emc.commit();
}
}
} else {
logger.info("没有绑定微信公众号 :" + message.getPerson());
}
} else {
logger.info("没有找到用户!");
}
}
} else {
logger.info("配置文件配置条件不足!!");
}
}
private String getOpenUrl(String workId) {
try {
String httpProtocol = Config.currentNode().getCenter().getHttpProtocol();
if (StringUtils.isEmpty(httpProtocol)) {
logger.error(new Exception("没有获取到http访问协议"));
return null;
}
String host = Config.currentNode().getWeb().getProxyHost();
if (StringUtils.isEmpty(host)) {
logger.error(new Exception("没有获取到代理地址"));
return null;
}
String workUrl = "workmobilewithaction.html?workid=" + workId;
String portalId = Config.mPweixin().getPortalId();
String portal = "portalmobile.html?id=" + portalId;
portal = URLEncoder.encode(portal, DefaultCharset.name);
workUrl += "&redirectlink=" + portal;
workUrl = URLEncoder.encode(workUrl, DefaultCharset.name);
String o2oaUrl = httpProtocol + "://" + host + "/x_desktop/";
o2oaUrl = o2oaUrl + "mpweixinsso.html?redirect=" + workUrl + "&type=login";
logger.info("o2oa 地址:" + o2oaUrl);
o2oaUrl = URLEncoder.encode(o2oaUrl, DefaultCharset.name);
logger.info("encode url :" + o2oaUrl);
String appId = Config.mPweixin().getAppid();
String url = "https://open.weixin.qq.com/connect/oauth2/authorize?appid=" + appId + "&redirect_uri="
+ o2oaUrl + "&response_type=code&scope=snsapi_userinfo&state=STATE#wechat_redirect";
logger.info("final url :" + url);
return url;
} catch (Exception e) {
logger.error(e);
return null;
}
}
public static class WeixinResponse {
private Integer errcode;
private String errmsg;
private Long msgid;
public Integer getErrcode() {
return errcode;
}
public void setErrcode(Integer errcode) {
this.errcode = errcode;
}
public String getErrmsg() {
return errmsg;
}
public void setErrmsg(String errmsg) {
this.errmsg = errmsg;
}
public Long getMsgid() {
return msgid;
}
public void setMsgid(Long msgid) {
this.msgid = msgid;
}
}
/**
* 微信发送模版消息的对象
*/
public static class WeixinTempMessage extends GsonPropertyObject {
private static final long serialVersionUID = 1577792511569896768L;
private String touser;
private String template_id;
private String url;
private String topcolor;
private Map<String, WeixinTempMessageFieldObj> data; // 模版字段数据
public String getTouser() {
return touser;
}
public void setTouser(String touser) {
this.touser = touser;
}
public String getTemplate_id() {
return template_id;
}
public void setTemplate_id(String template_id) {
this.template_id = template_id;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getTopcolor() {
return topcolor;
}
public void setTopcolor(String topcolor) {
this.topcolor = topcolor;
}
public Map<String, WeixinTempMessageFieldObj> getData() {
return data;
}
public void setData(Map<String, WeixinTempMessageFieldObj> data) {
this.data = data;
}
}
/**
* 模版字段对象
*/
public static class WeixinTempMessageFieldObj extends GsonPropertyObject {
private static final long serialVersionUID = -4230870572917531355L;
private String value;
private String color;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public String getColor() {
return color;
}
public void setColor(String color) {
this.color = color;
}
}
}
package com.x.message.assemble.communicate;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.persistence.EntityManager;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Order;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.google.gson.Gson;
import com.x.base.core.container.EntityManagerContainer;
import com.x.base.core.container.factory.EntityManagerContainerFactory;
import com.x.base.core.entity.JpaObject_;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.config.MessageMq;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.base.core.project.message.MessageConnector;
import com.x.base.core.project.queue.AbstractQueue;
import com.x.message.assemble.communicate.mq.ActiveMQ;
import com.x.message.assemble.communicate.mq.KafkaMQ;
import com.x.message.assemble.communicate.mq.MQInterface;
import com.x.base.core.project.tools.ListTools;
import com.x.message.core.entity.Message;
import com.x.message.core.entity.Message_;
public class MQConsumeQueue extends AbstractQueue<Message> {
private static Logger logger = LoggerFactory.getLogger(MQConsumeQueue.class);
private static final Logger LOGGER = LoggerFactory.getLogger(MQConsumeQueue.class);
private static final Gson gson = XGsonBuilder.instance();
protected void execute(Message message) throws Exception {
logger.info("MQConsumeQueue message.getTitle:"+ message.getTitle());
if (Config.mq().getEnable()) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
Business business = new Business(emc);
MQInterface MQClient;
EntityManager em = business.entityManagerContainer().get(Message.class);
CriteriaBuilder cb = em.getCriteriaBuilder();
CriteriaQuery<Message> cq = cb.createQuery(Message.class);
Root<Message> root = cq.from(Message.class);
Order order = cb.desc(root.get(Message_.createTime));
Predicate p = cb.notEqual(root.get(Message_.consumed), true);
p = cb.and(p, cb.equal(root.get(Message_.consumer), MessageConnector.CONSUME_MQ));
logger.info(p.toString());
List<Message> messages = em.createQuery(cq.select(root).where(p).orderBy(order)).setMaxResults(50).getResultList();
if(messages.size()>0) {
if(Config.mq().getMq().equalsIgnoreCase("kafka")) {
MQClient = KafkaMQ.getInstance();
}else {
MQClient = ActiveMQ.getInstance();
}
if(MQClient != null) {
for(Message mes : messages) {
boolean res = MQClient.sendMessage(mes);
if (res == false) {
Gson gson = new Gson();
String msg = gson.toJson(mes);
ExceptionMQMessage e = new ExceptionMQMessage(0, msg);
logger.error(e);
} else {
Message messageEntityObject = emc.find(mes.getId(), Message.class);
if (null != messageEntityObject) {
emc.beginTransaction(Message.class);
messageEntityObject.setConsumed(true);
emc.commit();
}
}
}
}
}
}
if (null != message && StringUtils.isNotEmpty(message.getItem())) {
update(message);
}
for (String id : listOverStay()) {
Optional<Message> optional = find(id);
if (optional.isPresent()) {
message = optional.get();
if (StringUtils.isNotEmpty(message.getItem())) {
update(message);
}
}
}
}
private Optional<Message> find(String id) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
return Optional.of(emc.find(id, Message.class));
} catch (Exception e) {
LOGGER.error(e);
}
return Optional.empty();
}
private void update(Message message) {
try {
MessageMq.Item item = Config.messageMq().get(message.getItem());
if (null != item) {
if (StringUtils.equalsIgnoreCase(message.getType(), MessageMq.Item.TYPE_KAFKA)) {
kafka(message, item);
} else if (StringUtils.equalsIgnoreCase(message.getType(), MessageMq.Item.TYPE_ACTIVEMQ)) {
activeMQ(message, item);
}
success(message.getId());
} else {
throw new ExceptionMessageMqItem(message.getItem());
}
} catch (InterruptedException ie) {
LOGGER.error(ie);
Thread.currentThread().interrupt();
} catch (Exception e) {
failure(message.getId(), e);
LOGGER.error(e);
}
}
private void kafka(Message message, MessageMq.Item item) throws InterruptedException, ExecutionException {
Properties properties = new Properties();
properties.put("bootstrap.servers", item.getKafkaBootstrapServers());
properties.put("acks", item.getKafkaAcks());
properties.put("retries", item.getKafkaRetries());
properties.put("batch.size", item.getKafkaBatchSize());
properties.put("linger.ms", item.getKafkaLingerMs());
properties.put("buffer.memory", item.getKafkaBufferMemory());
properties.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class.getName());
properties.put("value.serializer", org.apache.kafka.common.serialization.StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
String topic = item.getKafkaTopic();
String msg = gson.toJson(message);
producer.send(new ProducerRecord<>(topic, msg)).get();
}
}
private void activeMQ(Message message, MessageMq.Item item) throws JMSException {
ActiveMQConnectionFactory connectionFactory;
if (StringUtils.isNotBlank(item.getActiveMQUsername())) {
connectionFactory = new ActiveMQConnectionFactory(item.getActiveMQUsername(), item.getActiveMQPassword(),
item.getActiveMQUrl());
} else {
connectionFactory = new ActiveMQConnectionFactory(item.getActiveMQUrl());
}
connectionFactory.setTrustedPackages(ListTools.toList(MQConsumeQueue.class.getPackage().getName()));
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Destination destination = session.createQueue(item.getActiveMQQueueName());
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage textMessage = session.createTextMessage(gson.toJson(message));
producer.send(textMessage);
}
}
}
private void success(String id) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
Message message = emc.find(id, Message.class);
if (null != message) {
emc.beginTransaction(Message.class);
message.setConsumed(true);
emc.commit();
}
} catch (Exception e) {
LOGGER.error(e);
}
}
private void failure(String id, Exception exception) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
Message message = emc.find(id, Message.class);
if (null != message) {
emc.beginTransaction(Message.class);
Integer failure = message.getProperties().getFailure();
failure = (null == failure) ? 1 : failure + 1;
message.getProperties().setFailure(failure);
message.getProperties().setError(exception.getMessage());
emc.commit();
}
} catch (Exception e) {
LOGGER.error(e);
}
}
private List<String> listOverStay() {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
EntityManager em = emc.get(Message.class);
CriteriaBuilder cb = em.getCriteriaBuilder();
CriteriaQuery<String> cq = cb.createQuery(String.class);
Root<Message> root = cq.from(Message.class);
Predicate p = cb.equal(root.get(Message_.consumer), MessageConnector.CONSUME_MQ);
p = cb.and(p, cb.notEqual(root.get(Message_.consumed), true));
p = cb.and(p, cb.lessThan(root.get(JpaObject_.updateTime), DateUtils.addMinutes(new Date(), -20)));
cq.select(root.get(Message_.id)).where(p);
return em.createQuery(cq).setMaxResults(20).getResultList();
} catch (Exception e) {
LOGGER.error(e);
}
return new ArrayList<>();
}
}
package com.x.message.assemble.communicate;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import javax.mail.PasswordAuthentication;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import javax.persistence.EntityManager;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import com.x.base.core.container.EntityManagerContainer;
import com.x.base.core.container.factory.EntityManagerContainerFactory;
import com.x.base.core.entity.JpaObject_;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.config.MessageMail;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.base.core.project.message.MessageConnector;
import com.x.base.core.project.organization.Person;
import com.x.base.core.project.queue.AbstractQueue;
import com.x.message.core.entity.Message;
import com.x.message.core.entity.Message_;
public class MailConsumeQueue extends AbstractQueue<Message> {
private static final Logger LOGGER = LoggerFactory.getLogger(MailConsumeQueue.class);
protected void execute(Message message) throws Exception {
if (null != message && StringUtils.isNotEmpty(message.getItem())) {
update(message);
}
for (String id : listOverStay()) {
Optional<Message> optional = find(id);
if (optional.isPresent()) {
message = optional.get();
if (StringUtils.isNotEmpty(message.getItem())) {
update(message);
}
}
}
}
private Optional<Message> find(String id) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
return Optional.of(emc.find(id, Message.class));
} catch (Exception e) {
LOGGER.error(e);
}
return Optional.empty();
}
private void update(Message message) {
try {
MessageMail.Item item = Config.messageMail().get(message.getItem());
if (null != item) {
send(message, item);
success(message.getId());
} else {
throw new ExceptionMessageMailItem(message.getItem());
}
} catch (Exception e) {
failure(message.getId(), e);
LOGGER.error(e);
}
}
private String getRecipient(Message message) throws Exception {
String value = "";
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
Business business = new Business(emc);
Person person = business.organization().person().getObject(message.getPerson());
value = person.getMail();
}
return value;
}
private void send(Message message, MessageMail.Item item) throws Exception {
String recipient = getRecipient(message);
if (StringUtils.isBlank(recipient)) {
throw new ExceptionEmptyRecipient(message.getPerson());
}
Properties properties = new Properties();
properties.put("mail.smtp.host", item.getHost());
properties.put("mail.smtp.port", item.getPort());
properties.put("mail.smtp.ssl.enable", item.getSslEnable());
properties.put("mail.smtp.auth", item.getAuth());
Session session = Session.getInstance(properties, new javax.mail.Authenticator() {
@Override
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(item.getFrom(), item.getPassword());
}
});
MimeMessage mime = new MimeMessage(session);
mime.setFrom(new InternetAddress(item.getFrom()));
mime.addRecipient(javax.mail.Message.RecipientType.TO, new InternetAddress(recipient));
mime.setSubject(message.getTitle());
mime.setText(message.getBody());
Transport.send(mime);
}
private void success(String id) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
Message message = emc.find(id, Message.class);
if (null != message) {
emc.beginTransaction(Message.class);
message.setConsumed(true);
emc.commit();
}
} catch (Exception e) {
LOGGER.error(e);
}
}
private void failure(String id, Exception exception) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
Message message = emc.find(id, Message.class);
if (null != message) {
emc.beginTransaction(Message.class);
Integer failure = message.getProperties().getFailure();
failure = (null == failure) ? 1 : failure + 1;
message.getProperties().setFailure(failure);
message.getProperties().setError(exception.getMessage());
emc.commit();
}
} catch (Exception e) {
LOGGER.error(e);
}
}
private List<String> listOverStay() {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
EntityManager em = emc.get(Message.class);
CriteriaBuilder cb = em.getCriteriaBuilder();
CriteriaQuery<String> cq = cb.createQuery(String.class);
Root<Message> root = cq.from(Message.class);
Predicate p = cb.equal(root.get(Message_.consumer), MessageConnector.CONSUME_MAIL);
p = cb.and(p, cb.notEqual(root.get(Message_.consumed), true));
p = cb.and(p, cb.lessThan(root.get(JpaObject_.updateTime), DateUtils.addMinutes(new Date(), -20)));
cq.select(root.get(Message_.id)).where(p);
return em.createQuery(cq).setMaxResults(20).getResultList();
} catch (Exception e) {
LOGGER.error(e);
}
return new ArrayList<>();
}
}
package com.x.message.assemble.communicate;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.persistence.EntityManager;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.x.base.core.container.EntityManagerContainer;
import com.x.base.core.container.factory.EntityManagerContainerFactory;
import com.x.base.core.entity.JpaObject_;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.config.MessageRestful;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.base.core.project.message.MessageConnector;
import com.x.base.core.project.queue.AbstractQueue;
import com.x.base.core.project.webservices.WebservicesClient;
import com.x.message.core.entity.Message;
import com.x.message.core.entity.Message_;
public class RestfulConsumeQueue extends AbstractQueue<Message> {
private static final Logger LOGGER = LoggerFactory.getLogger(RestfulConsumeQueue.class);
private static final Pattern pattern = Pattern.compile("\\{\\$(.+?)\\}");
private static final Gson gson = XGsonBuilder.instance();
private static WebservicesClient client = new WebservicesClient();
protected void execute(Message message) throws Exception {
if (null != message && StringUtils.isNotEmpty(message.getItem())) {
update(message);
}
for (String id : listOverStay()) {
Optional<Message> optional = find(id);
if (optional.isPresent()) {
message = optional.get();
if (StringUtils.isNotEmpty(message.getItem())) {
update(message);
}
}
}
}
private Optional<Message> find(String id) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
return Optional.of(emc.find(id, Message.class));
} catch (Exception e) {
LOGGER.error(e);
}
return Optional.empty();
}
private void update(Message message) {
try {
MessageRestful.Item item = Config.messageRestful().get(message.getItem());
if (null != item) {
String url = url(message, item);
client.restful(item.getMethod(), url, null, message.getBody(), 5000, 5000);
success(message.getId());
} else {
throw new ExceptionMessageRestfulItem(message.getItem());
}
} catch (Exception e) {
failure(message.getId(), e);
LOGGER.error(e);
}
}
private String url(Message message, MessageRestful.Item item) {
String url = item.getUrl();
JsonElement jsonElement = gson.toJsonTree(message.getBody());
if (jsonElement.isJsonObject()) {
JsonObject jsonObject = jsonElement.getAsJsonObject();
if (null != jsonObject) {
Matcher matcher = pattern.matcher(url);
while (matcher.find()) {
String key = matcher.group(1);
if (jsonObject.has(key)) {
String value = jsonObject.get(key).getAsString();
if (null != value) {
url = StringUtils.replace(url, matcher.group(), value);
}
}
}
}
}
return url;
}
private void success(String id) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
Message message = emc.find(id, Message.class);
if (null != message) {
emc.beginTransaction(Message.class);
message.setConsumed(true);
emc.commit();
}
} catch (Exception e) {
LOGGER.error(e);
}
}
private void failure(String id, Exception exception) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
Message message = emc.find(id, Message.class);
if (null != message) {
emc.beginTransaction(Message.class);
Integer failure = message.getProperties().getFailure();
failure = (null == failure) ? 1 : failure + 1;
message.getProperties().setFailure(failure);
message.getProperties().setError(exception.getMessage());
emc.commit();
}
} catch (Exception e) {
LOGGER.error(e);
}
}
private List<String> listOverStay() {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
EntityManager em = emc.get(Message.class);
CriteriaBuilder cb = em.getCriteriaBuilder();
CriteriaQuery<String> cq = cb.createQuery(String.class);
Root<Message> root = cq.from(Message.class);
Predicate p = cb.equal(root.get(Message_.consumer), MessageConnector.CONSUME_RESTFUL);
p = cb.and(p, cb.notEqual(root.get(Message_.consumed), true));
p = cb.and(p, cb.lessThan(root.get(JpaObject_.updateTime), DateUtils.addMinutes(new Date(), -20)));
cq.select(root.get(Message_.id)).where(p);
return em.createQuery(cq).setMaxResults(20).getResultList();
} catch (Exception e) {
LOGGER.error(e);
}
return new ArrayList<>();
}
}
......@@ -7,7 +7,7 @@ import com.x.base.core.project.cache.CacheManager;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.message.MessageConnector;
import com.x.message.assemble.communicate.schedule.Clean;
import com.x.message.assemble.communicate.schedule.TriggerMq;
import com.x.message.assemble.communicate.schedule.TriggerMessageConsumeQueue;
public class ThisApplication {
......@@ -37,6 +37,12 @@ public class ThisApplication {
public static final MPWeixinConsumeQueue mpWeixinConsumeQueue = new MPWeixinConsumeQueue();
public static final RestfulConsumeQueue restfulConsumeQueue = new RestfulConsumeQueue();
public static final MailConsumeQueue mailConsumeQueue = new MailConsumeQueue();
public static final ApiConsumeQueue apiConsumeQueue = new ApiConsumeQueue();
public static Context context() {
return context;
}
......@@ -49,8 +55,9 @@ public class ThisApplication {
if (BooleanUtils.isTrue(Config.communicate().clean().getEnable())) {
context().schedule(Clean.class, Config.communicate().clean().getCron());
}
if (BooleanUtils.isTrue(Config.communicate().cronMq().getEnable())) {
context().schedule(TriggerMq.class, Config.communicate().cronMq().getCron());
if (BooleanUtils.isTrue(Config.communicate().triggerMessageConsumeQueue().getEnable())) {
context().schedule(TriggerMessageConsumeQueue.class,
Config.communicate().triggerMessageConsumeQueue().getCron());
}
} catch (Exception e) {
e.printStackTrace();
......@@ -59,6 +66,10 @@ public class ThisApplication {
private static void startQueue() throws Exception {
startCommunicateQueue();
context().startQueue(restfulConsumeQueue);
context().startQueue(apiConsumeQueue);
context().startQueue(mailConsumeQueue);
context().startQueue(mqConsumeQueue);
if (BooleanUtils.isTrue(Config.qiyeweixin().getEnable())
&& BooleanUtils.isTrue(Config.qiyeweixin().getMessageEnable())) {
context().startQueue(qiyeweixinConsumeQueue);
......@@ -76,9 +87,6 @@ public class ThisApplication {
if (Config.weLink().getEnable() && Config.weLink().getMessageEnable()) {
context().startQueue(weLinkConsumeQueue);
}
if (BooleanUtils.isTrue(Config.mq().getEnable())) {
context().startQueue(mqConsumeQueue);
}
if (BooleanUtils.isTrue(Config.mPweixin().getEnable())
&& BooleanUtils.isTrue(Config.mPweixin().getMessageEnable())) {
context().startQueue(mpWeixinConsumeQueue);
......
......@@ -19,20 +19,21 @@ import com.x.message.core.entity.Message;
public class WsConsumeQueue extends AbstractQueue<Message> {
private static Logger logger = LoggerFactory.getLogger(WsConsumeQueue.class);
private static final Logger LOGGER = LoggerFactory.getLogger(WsConsumeQueue.class);
private static final String TASK_FIRST = "first";
private static final String WORKCREATETYPE = "workCreateType";
private static final String SURFACE = "surface";
protected void execute(Message message) throws Exception {
LOGGER.debug("execute:{}.", message::getTitle);
WsMessage ws = new WsMessage();
ws.setType(message.getType());
ws.setPerson(message.getPerson());
ws.setTitle(message.getTitle());
JsonElement jsonElement = XGsonBuilder.instance().fromJson(message.getBody(), JsonElement.class);
ws.setBody(jsonElement);
Boolean result = false;
boolean result = false;
/* 跳过第一条待办的提醒 */
if (StringUtils.equalsIgnoreCase(ws.getType(), MessageConnector.TYPE_TASK_CREATE)
&& BooleanUtils.isTrue(XGsonBuilder.extractBoolean(jsonElement, TASK_FIRST))
......
package com.x.message.assemble.communicate.jaxrs.connector;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
......@@ -11,16 +15,15 @@ import javax.script.Bindings;
import javax.script.CompiledScript;
import javax.script.ScriptContext;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.x.base.core.container.EntityManagerContainer;
import com.x.base.core.container.factory.EntityManagerContainerFactory;
import com.x.base.core.entity.annotation.CheckPersistType;
import com.x.base.core.project.cache.Cache.CacheKey;
import com.x.base.core.project.cache.CacheManager;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.config.Message.Consumer;
import com.x.base.core.project.http.ActionResult;
import com.x.base.core.project.http.EffectivePerson;
import com.x.base.core.project.jaxrs.WrapBoolean;
......@@ -34,6 +37,10 @@ import com.x.message.assemble.communicate.ThisApplication;
import com.x.message.core.entity.Instant;
import com.x.message.core.entity.Message;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
class ActionCreate extends BaseAction {
private static final Logger LOGGER = LoggerFactory.getLogger(ActionCreate.class);
......@@ -45,11 +52,19 @@ class ActionCreate extends BaseAction {
ActionResult<Wo> result = new ActionResult<>();
Wi wi = this.convertToWrapIn(jsonElement, Wi.class);
Map<String, String> consumersV2 = Config.messages().getConsumersV2(wi.getType());
Instant instant = this.instant(wi, new ArrayList<>(consumersV2.keySet()));
List<Message> messages = assemble(wi, consumersV2, instant);
save(instant, messages);
this.sendMessage(messages);
if (BooleanUtils.isTrue(Config.messages().v3Enable())) {
List<Consumer> consumers = Config.messages().getConsumersV3(wi.getType());
Instant instant = v3instant(wi);
List<Message> messages = v3Assemble(wi, instant, consumers);
v3Save(instant, messages);
this.v3SendMessage(messages);
} else {
Map<String, String> consumersV2 = Config.messages().getConsumersV2(wi.getType());
Instant instant = this.instant(wi, new ArrayList<>(consumersV2.keySet()));
List<Message> messages = assemble(wi, consumersV2, instant);
save(instant, messages);
this.sendMessage(messages);
}
Wo wo = new Wo();
wo.setValue(true);
result.setData(wo);
......@@ -201,10 +216,6 @@ class ActionCreate extends BaseAction {
sendMessageMPWeixin(message);
break;
default:
if (message.getConsumer().startsWith(MessageConnector.CONSUME_MQ)
&& BooleanUtils.isTrue(Config.mq().getEnable())) {
ThisApplication.mqConsumeQueue.send(message);
}
break;
}
}
......@@ -218,9 +229,7 @@ class ActionCreate extends BaseAction {
}
private void sendMessageMq(Message message) throws Exception {
if (BooleanUtils.isTrue(Config.mq().getEnable())) {
ThisApplication.mqConsumeQueue.send(message);
}
ThisApplication.mqConsumeQueue.send(message);
}
private void sendMessagePmsInner(Message message) throws Exception {
......@@ -498,4 +507,335 @@ class ActionCreate extends BaseAction {
}
private Instant v3instant(Wi wi) {
Instant instant = new Instant();
instant.setBody(Objects.toString(wi.getBody()));
instant.setType(wi.getType());
instant.setPerson(wi.getPerson());
instant.setTitle(wi.getTitle());
instant.setConsumed(false);
return instant;
}
private List<Message> v3Assemble(Wi wi, Instant instant, List<Consumer> consumers) {
List<Message> messages = new ArrayList<>();
if (!consumers.isEmpty()) {
for (Consumer consumer : consumers) {
if (BooleanUtils.isTrue(consumer.getEnable())) {
Message message = this.v3AssembleMessage(wi, consumer, instant);
if (message != null) {
messages.add(message);
}
}
}
}
return messages;
}
private Message v3AssembleMessage(Wi wi, Consumer consumer, Instant instant) {
Message message = null;
String type = Objects.toString(consumer.getType(), "");
switch (type) {
case MessageConnector.CONSUME_WS:
message = this.v3WsMessage(wi, consumer);
break;
case MessageConnector.CONSUME_PMS:
message = this.v3PmsMessage(wi, consumer);
break;
case MessageConnector.CONSUME_PMS_INNER:
message = this.v3PmsInnerMessage(wi, consumer);
break;
case MessageConnector.CONSUME_DINGDING:
message = this.v3DingdingMessage(wi, consumer);
break;
case MessageConnector.CONSUME_ZHENGWUDINGDING:
message = this.v3ZhengwudingdingMessage(wi, consumer);
break;
case MessageConnector.CONSUME_QIYEWEIXIN:
message = this.v3QiyeweixinMessage(wi, consumer);
break;
case MessageConnector.CONSUME_WELINK:
message = this.v3WeLinkMessage(wi, consumer);
break;
case MessageConnector.CONSUME_MPWEIXIN:
message = this.v3MpweixinMessage(wi, consumer);
break;
case MessageConnector.CONSUME_CALENDAR:
message = this.v3CalendarMessage(wi, consumer);
break;
case MessageConnector.CONSUME_RESTFUL:
message = this.v3Message(wi, consumer);
break;
case MessageConnector.CONSUME_MQ:
message = this.v3Message(wi, consumer);
break;
case MessageConnector.CONSUME_API:
message = this.v3Message(wi, consumer);
break;
case MessageConnector.CONSUME_MAIL:
message = this.v3Message(wi, consumer);
break;
default:
message = this.v3DefaultMessage(wi, consumer);
break;
}
if (null != message) {
message.setInstant(instant.getId());
}
return message;
}
private Message v3Message(Wi wi, Consumer consumer) {
Message message = new Message();
message.setBody(Objects.toString(v3load(wi, consumer)));
message.setType(wi.getType());
message.setPerson(wi.getPerson());
message.setTitle(wi.getTitle());
message.setConsumed(false);
message.setConsumer(consumer.getType());
message.setItem(consumer.getItem());
return message;
}
private Message v3WsMessage(Wi wi, Consumer consumer) {
Message message = null;
try {
if (BooleanUtils.isTrue(Config.communicate().wsEnable()) && BooleanUtils.isTrue(v3Filter(wi, consumer))) {
message = v3Message(wi, consumer);
}
} catch (Exception e) {
LOGGER.error(e);
}
return message;
}
private Message v3PmsMessage(Wi wi, Consumer consumer) {
Message message = null;
try {
if (BooleanUtils.isTrue(Config.communicate().pmsEnable()) && BooleanUtils.isTrue(v3Filter(wi, consumer))) {
message = v3Message(wi, consumer);
}
} catch (Exception e) {
LOGGER.error(e);
}
return message;
}
private Message v3PmsInnerMessage(Wi wi, Consumer consumer) {
Message message = null;
try {
if (BooleanUtils.isTrue(Config.pushConfig().getEnable()) && BooleanUtils.isTrue(v3Filter(wi, consumer))) {
message = v3Message(wi, consumer);
}
} catch (Exception e) {
LOGGER.error(e);
}
return message;
}
private Message v3DingdingMessage(Wi wi, Consumer consumer) {
Message message = null;
try {
if (BooleanUtils.isTrue(Config.dingding().getEnable())
&& BooleanUtils.isTrue(Config.dingding().getMessageEnable())
&& BooleanUtils.isTrue(v3Filter(wi, consumer))) {
message = v3Message(wi, consumer);
}
} catch (Exception e) {
LOGGER.error(e);
}
return message;
}
private Message v3ZhengwudingdingMessage(Wi wi, Consumer consumer) {
Message message = null;
try {
if (Config.zhengwuDingding().getEnable() && Config.zhengwuDingding().getMessageEnable()
&& BooleanUtils.isTrue(v3Filter(wi, consumer))) {
message = v3Message(wi, consumer);
}
} catch (Exception e) {
LOGGER.error(e);
}
return message;
}
private Message v3QiyeweixinMessage(Wi wi, Consumer consumer) {
Message message = null;
try {
if (BooleanUtils.isTrue(Config.qiyeweixin().getEnable())
&& BooleanUtils.isTrue(Config.qiyeweixin().getMessageEnable())
&& BooleanUtils.isTrue(v3Filter(wi, consumer))) {
message = v3Message(wi, consumer);
}
} catch (Exception e) {
LOGGER.error(e);
}
return message;
}
private Message v3WeLinkMessage(Wi wi, Consumer consumer) {
Message message = null;
try {
if (BooleanUtils.isTrue(Config.weLink().getEnable())
&& BooleanUtils.isTrue(Config.weLink().getMessageEnable())
&& BooleanUtils.isTrue(v3Filter(wi, consumer))) {
message = v3Message(wi, consumer);
}
} catch (Exception e) {
LOGGER.error(e);
}
return message;
}
private Message v3MpweixinMessage(Wi wi, Consumer consumer) {
Message message = null;
try {
if (BooleanUtils.isTrue(Config.mPweixin().getEnable())
&& BooleanUtils.isTrue(Config.mPweixin().getMessageEnable())
&& BooleanUtils.isTrue(v3Filter(wi, consumer))) {
message = v3Message(wi, consumer);
}
} catch (Exception e) {
LOGGER.error(e);
}
return message;
}
private Message v3CalendarMessage(Wi wi, Consumer consumer) {
Message message = null;
try {
if (BooleanUtils.isTrue(Config.communicate().calendarEnable())
&& BooleanUtils.isTrue(v3Filter(wi, consumer))) {
message = v3Message(wi, consumer);
}
} catch (Exception e) {
LOGGER.error(e);
}
return message;
}
private Message v3DefaultMessage(Wi wi, Consumer consumer) {
return v3Message(wi, consumer);
}
private boolean v3Filter(Wi wi, Consumer consumer) {
try {
if (StringUtils.isNotBlank(consumer.getFilter())) {
CacheKey cacheKey = new CacheKey(this.getClass(), consumer.getFilter());
Optional<?> optional = CacheManager.get(cacheCategory, cacheKey);
CompiledScript compiledScript = null;
if (optional.isPresent()) {
compiledScript = (CompiledScript) optional.get();
} else {
Path path = Config.dir_config().toPath().resolve(consumer.getFilter());
compiledScript = ScriptingFactory
.functionalizationCompile(Files.readString(path, StandardCharsets.UTF_8));
CacheManager.put(cacheCategory, cacheKey, compiledScript);
}
if (compiledScript != null) {
ScriptContext scriptContext = ScriptingFactory.scriptContextEvalInitialServiceScript();
Bindings bindings = scriptContext.getBindings(ScriptContext.ENGINE_SCOPE);
bindings.put(ScriptingFactory.BINDING_NAME_SERVICE_MESSAGE, wi.getBody());
Boolean filter = JsonScriptingExecutor.evalBoolean(compiledScript, scriptContext);
return BooleanUtils.isTrue(filter);
}
}
} catch (Exception e) {
LOGGER.warn("执行filter脚本 {} 异常:{}.", consumer.getLoader(), e.getMessage());
}
return true;
}
private JsonElement v3load(Wi wi, Consumer consumer) {
JsonElement jsonElement = wi.getBody();
try {
if (StringUtils.isNotBlank(consumer.getLoader())) {
CacheKey cacheKey = new CacheKey(this.getClass(), consumer.getLoader());
Optional<?> optional = CacheManager.get(cacheCategory, cacheKey);
CompiledScript compiledScript = null;
if (optional.isPresent()) {
compiledScript = (CompiledScript) optional.get();
} else {
Path path = Config.dir_config().toPath().resolve(consumer.getLoader());
compiledScript = ScriptingFactory
.functionalizationCompile(Files.readString(path, StandardCharsets.UTF_8));
CacheManager.put(cacheCategory, cacheKey, compiledScript);
}
if (compiledScript != null) {
ScriptContext scriptContext = ScriptingFactory.scriptContextEvalInitialServiceScript();
Bindings bindings = scriptContext.getBindings(ScriptContext.ENGINE_SCOPE);
bindings.put(ScriptingFactory.BINDING_NAME_SERVICE_MESSAGE, wi.getBody());
jsonElement = JsonScriptingExecutor.jsonElement(compiledScript, scriptContext);
}
}
} catch (Exception e) {
LOGGER.warn("执行loader脚本 {} 异常:{}.", consumer.getLoader(), e.getMessage());
}
return jsonElement;
}
private void v3Save(Instant instant, List<Message> messages) throws Exception {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
emc.beginTransaction(Instant.class);
emc.persist(instant, CheckPersistType.all);
if (ListTools.isNotEmpty(messages)) {
emc.beginTransaction(Message.class);
for (Message message : messages) {
emc.persist(message, CheckPersistType.all);
}
}
emc.commit();
}
}
private void v3SendMessage(List<Message> messages) throws Exception {
for (Message message : messages) {
switch (message.getConsumer()) {
case MessageConnector.CONSUME_WS:
ThisApplication.wsConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_PMS:
ThisApplication.pmsConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_CALENDAR:
ThisApplication.calendarConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_DINGDING:
ThisApplication.dingdingConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_WELINK:
ThisApplication.weLinkConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_ZHENGWUDINGDING:
ThisApplication.zhengwuDingdingConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_QIYEWEIXIN:
ThisApplication.qiyeweixinConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_PMS_INNER:
ThisApplication.pmsInnerConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_MPWEIXIN:
ThisApplication.mpWeixinConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_RESTFUL:
ThisApplication.restfulConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_MQ:
ThisApplication.mqConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_MAIL:
ThisApplication.mailConsumeQueue.send(message);
break;
case MessageConnector.CONSUME_API:
ThisApplication.apiConsumeQueue.send(message);
break;
default:
break;
}
}
}
}
package com.x.message.assemble.communicate.jaxrs.connector;
import com.x.base.core.project.cache.Cache.CacheCategory;
import com.x.base.core.project.jaxrs.StandardJaxrsAction;
import com.x.message.core.entity.Instant;
import com.x.message.core.entity.Message;
abstract class BaseAction extends StandardJaxrsAction {
protected CacheCategory cacheCategory = new CacheCategory(Instant.class, Message.class);
}
\ No newline at end of file
package com.x.message.assemble.communicate.mq;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSslConnectionFactory;
import com.google.gson.Gson;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.config.MQActive;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.message.core.entity.Message;
public class ActiveMQ implements MQInterface {
private static Logger logger = LoggerFactory.getLogger(ActiveMQ.class);
private Connection connection = null;
private MessageProducer producer = null;
private Session session = null;
private ActiveMQ() {
try {
MQActive configMQ = Config.mq().getActiveMQ();
logger.info("MqActive initialize.....");
String queueName = configMQ.getQueueName();
String url = configMQ.getUrl();
url = url.trim();
String protocol = url.substring(0, 3);
if (protocol.equalsIgnoreCase("tcp")) {
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
this.connection = factory.createConnection();
this.connection.start();
this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
this.producer = session.createProducer(destination);
} else {
String keyStore = configMQ.getKeyStore();
String keyStorePassword = configMQ.getKeyStorePassword();
String trustStore = configMQ.getTrustStore();
ActiveMQSslConnectionFactory sslConnectionFactory = new ActiveMQSslConnectionFactory();
sslConnectionFactory.setBrokerURL(url);
sslConnectionFactory.setKeyAndTrustManagers(this.loadKeyManager(keyStore, keyStorePassword),
this.loadTrustManager(trustStore), new java.security.SecureRandom());
this.connection = sslConnectionFactory.createConnection();
this.connection.start();
this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
this.producer = session.createProducer(destination);
}
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
}
}
private static class MQHolder {
private static ActiveMQ instance = new ActiveMQ();
}
public static ActiveMQ getInstance() {
return MQHolder.instance;
}
public static void main(String[] args) {
ActiveMQ MQClient = getInstance();
Message msg = new Message();
msg.setBody("body");
msg.setConsumed(false);
msg.setCreateTime(new Date());
msg.setPerson("person");
MQClient.sendMessage(msg);
}
@Override
public boolean sendMessage(Message message) {
try {
Gson gson = new Gson();
String msg = gson.toJson(message);
TextMessage textMessage = this.session.createTextMessage(msg);
this.producer.send(textMessage);
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
return false;
} finally {
}
return true;
}
public void destroy() {
try {
logger.info("MqActive destroy.....");
this.connection.close();
} catch (JMSException e) {
e.printStackTrace();
logger.error(e);
}
}
/**
* 加载证书文件
*
* @param trustStore
* @return
* @throws java.security.NoSuchAlgorithmException
* @throws java.security.KeyStoreException
* @throws java.io.IOException
* @throws java.security.GeneralSecurityException
*/
public static TrustManager[] loadTrustManager(String trustStore) throws java.security.NoSuchAlgorithmException,
java.security.KeyStoreException, java.io.IOException, java.security.GeneralSecurityException {
KeyStore ks = KeyStore.getInstance("JKS");
try (FileInputStream fis = new FileInputStream(trustStore)) {
ks.load(fis, null);
}
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(ks);
return tmf.getTrustManagers();
}
/**
* 加载密钥文件
*
* @param keyStore
* @param keyStorePassword
* @return
* @throws java.security.NoSuchAlgorithmException
* @throws java.security.KeyStoreException
* @throws java.security.GeneralSecurityException
* @throws java.security.cert.CertificateException
* @throws java.io.IOException
* @throws java.security.UnrecoverableKeyException
*/
public static KeyManager[] loadKeyManager(String keyStore, String keyStorePassword)
throws java.security.NoSuchAlgorithmException, java.security.KeyStoreException,
java.security.GeneralSecurityException, java.security.cert.CertificateException, java.io.IOException,
java.security.UnrecoverableKeyException {
KeyStore ks = KeyStore.getInstance("JKS");
try (FileInputStream fis = new FileInputStream(keyStore)) {
ks.load(fis, keyStorePassword.toCharArray());
}
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, keyStorePassword.toCharArray());
return kmf.getKeyManagers();
}
}
\ No newline at end of file
package com.x.message.assemble.communicate.mq;
import java.util.Date;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.google.gson.Gson;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.config.MQKafka;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.message.core.entity.Message;
public class KafkaMQ implements MQInterface {
private static Logger logger = LoggerFactory.getLogger(KafkaMQ.class);
private Producer<String, String> producer = null;
private String topic = "";
public Producer<String, String> getProducer() {
return producer;
}
public void setProducer(Producer<String, String> producer) {
this.producer = producer;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
private KafkaMQ() {
try {
MQKafka configMQ = Config.mq().getKafka();
logger.info("MQ initialize.....");
Properties properties = new Properties();
properties.put("bootstrap.servers", configMQ.getBootstrap_servers());
properties.put("acks", configMQ.getAcks());
properties.put("retries", configMQ.getRetries());
properties.put("batch.size", configMQ.getBatch_size());
properties.put("linger.ms", configMQ.getLinger_ms());
properties.put("buffer.memory", configMQ.getBuffer_memory());
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
this.producer = new KafkaProducer<String, String>(properties);
this.topic = configMQ.getTopic();
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
}
}
private static class MQHolder{
private static KafkaMQ instance = new KafkaMQ();
}
public static KafkaMQ getInstance(){
return MQHolder.instance;
}
public static void main(String[] args) {
KafkaMQ MQClient = getInstance();
Message msg = new Message();
msg.setBody("body");
msg.setConsumed(false);
msg.setCreateTime(new Date());
msg.setPerson("person");
System.out.println(MQClient.sendMessage(msg));
}
@Override
public boolean sendMessage(Message message) {
try {
Gson gson = new Gson();
String msg = gson.toJson(message);
this.producer.send(new ProducerRecord<String, String>(this.getTopic(), msg));
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
return false;
} finally {
// this.producer.close();
}
return true;
}
public void destroy() {
this.producer.close();
}
}
package com.x.message.assemble.communicate.mq;
import com.x.message.core.entity.Message;
public interface MQInterface {
public boolean sendMessage(Message message);
}
......@@ -15,6 +15,7 @@ import org.quartz.JobExecutionException;
import com.x.base.core.container.EntityManagerContainer;
import com.x.base.core.container.factory.EntityManagerContainerFactory;
import com.x.base.core.entity.JpaObject_;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
......@@ -23,13 +24,11 @@ import com.x.base.core.project.tools.ListTools;
import com.x.base.core.project.utils.time.TimeStamp;
import com.x.message.assemble.communicate.Business;
import com.x.message.core.entity.Instant;
import com.x.message.core.entity.Instant_;
import com.x.message.core.entity.Message;
import com.x.message.core.entity.Message_;
public class Clean extends AbstractJob {
private static Logger logger = LoggerFactory.getLogger(Clean.class);
private static final Logger LOGGER = LoggerFactory.getLogger(Clean.class);
@Override
public void schedule(JobExecutionContext jobExecutionContext) throws Exception {
......@@ -38,10 +37,10 @@ public class Clean extends AbstractJob {
Business business = new Business(emc);
Long instantCount = this.clearInstant(business);
Long messageCount = this.clearMessage(business);
logger.print("清理过期的消息内容,其中主体消息: {} 条, 消息: {} 条, 耗时: {}.", instantCount, messageCount,
stamp.consumingMilliseconds());
LOGGER.info("清理过期的消息内容,其中主体消息: {} 条, 消息: {} 条, 耗时: {}.", () -> instantCount, () -> messageCount,
stamp::consumingMilliseconds);
} catch (Exception e) {
logger.error(e);
LOGGER.error(e);
throw new JobExecutionException(e);
}
}
......@@ -69,7 +68,7 @@ public class Clean extends AbstractJob {
CriteriaQuery<Instant> cq = cb.createQuery(Instant.class);
Root<Instant> root = cq.from(Instant.class);
Date limit = DateUtils.addDays(new Date(), -Config.communicate().clean().getKeep());
Predicate p = cb.lessThan(root.get(Instant_.createTime), limit);
Predicate p = cb.lessThan(root.get(JpaObject_.createTime), limit);
return em.createQuery(cq.select(root).where(p)).setMaxResults(2000).getResultList();
}
......@@ -97,7 +96,7 @@ public class Clean extends AbstractJob {
CriteriaQuery<Message> cq = cb.createQuery(Message.class);
Root<Message> root = cq.from(Message.class);
Date limit = DateUtils.addDays(new Date(), -Config.communicate().clean().getKeep());
Predicate p = cb.lessThan(root.get(Message_.createTime), limit);
Predicate p = cb.lessThan(root.get(JpaObject_.createTime), limit);
return em.createQuery(cq.select(root).where(p)).setMaxResults(200).getResultList();
}
......
//package com.x.message.assemble.communicate.schedule;
//
//import java.util.ArrayList;
//import java.util.List;
//import java.util.Map.Entry;
//
//import javax.websocket.Session;
//
//import org.quartz.Job;
//import org.quartz.JobExecutionContext;
//import org.quartz.JobExecutionException;
//
//import com.x.base.core.project.logger.Logger;
//import com.x.base.core.project.logger.LoggerFactory;
//import com.x.message.assemble.communicate.ws.collaboration.ActionCollaboration;
//
//public class CleanConnections implements Job {
//
// private static Logger logger = LoggerFactory.getLogger(CleanConnections.class);
//
// @Override
// /* 定时清理session已经关闭的用户 */
// public void execute(JobExecutionContext arg0) throws JobExecutionException {
// try {
// List<Session> removes = new ArrayList<>();
// for (Entry<Session, String> entry : ActionCollaboration.clients.entrySet()) {
// if (!entry.getKey().isOpen()) {
// removes.add(entry.getKey());
// }
// }
// for (Session session : removes) {
// ActionCollaboration.clients.remove(session);
// }
// logger.debug("clean {} websocket session, {} online.", removes.size(), ActionCollaboration.clients.size());
// } catch (Exception e) {
// logger.error(e);
// throw new JobExecutionException(e);
// }
// }
//
//}
\ No newline at end of file
package com.x.message.assemble.communicate.schedule;
import org.quartz.JobExecutionContext;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.base.core.project.schedule.AbstractJob;
import com.x.message.assemble.communicate.ThisApplication;
public class TriggerMessageConsumeQueue extends AbstractJob {
private static final Logger LOGGER = LoggerFactory.getLogger(TriggerMessageConsumeQueue.class);
@Override
public void schedule(JobExecutionContext jobExecutionContext) throws Exception {
LOGGER.debug("schedule trigger restfulConsumeQueue, mqConsumeQueue, mailConsumeQueue, apiConsumeQueue.");
ThisApplication.restfulConsumeQueue.send(null);
ThisApplication.mqConsumeQueue.send(null);
ThisApplication.mailConsumeQueue.send(null);
ThisApplication.apiConsumeQueue.send(null);
}
}
\ No newline at end of file
package com.x.message.assemble.communicate.schedule;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import com.x.base.core.container.EntityManagerContainer;
import com.x.base.core.container.factory.EntityManagerContainerFactory;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.base.core.project.message.MessageConnector;
import com.x.base.core.project.schedule.AbstractJob;
import com.x.message.assemble.communicate.ThisApplication;
import com.x.message.core.entity.Message;
public class TriggerMq extends AbstractJob {
private static Logger logger = LoggerFactory.getLogger(Clean.class);
@Override
public void schedule(JobExecutionContext jobExecutionContext) throws Exception {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
if (Config.mq().getEnable()) {
Message message = new Message();
message.setBody("");
message.setType("TriggerMq");
message.setPerson("");
message.setTitle("TriggerMq");
message.setConsumer(MessageConnector.CONSUME_MQ);
message.setConsumed(false);
message.setInstant("");
ThisApplication.mqConsumeQueue.send(message);
}
} catch (Exception e) {
logger.error(e);
throw new JobExecutionException(e);
}
}
}
\ No newline at end of file
......@@ -11,14 +11,16 @@ import javax.persistence.Lob;
import javax.persistence.Table;
import javax.persistence.UniqueConstraint;
import org.apache.openjpa.persistence.jdbc.Index;
import com.x.base.core.entity.JpaObject;
import com.x.base.core.entity.SliceJpaObject;
import com.x.base.core.entity.annotation.CheckPersist;
import com.x.base.core.entity.annotation.ContainerEntity;
import com.x.base.core.project.annotation.FieldDescribe;
import org.apache.openjpa.persistence.Persistent;
import org.apache.openjpa.persistence.jdbc.Index;
import org.apache.openjpa.persistence.jdbc.Strategy;
@Entity
@ContainerEntity(dumpSize = 1000, type = ContainerEntity.Type.log, reference = ContainerEntity.Reference.soft)
@Table(name = PersistenceProperties.Message.table, uniqueConstraints = {
......@@ -51,55 +53,81 @@ public class Message extends SliceJpaObject {
// nothing
}
public static final String title_FIELDNAME = "title";
public MessageProperties getProperties() {
if (null == this.properties) {
this.properties = new MessageProperties();
}
return this.properties;
}
public void setProperties(MessageProperties properties) {
this.properties = properties;
}
public static final String TITLE_FIELDNAME = "title";
@FieldDescribe("通知标题.")
@Column(length = length_255B, name = ColumnNamePrefix + title_FIELDNAME)
@Index(name = TABLE + IndexNameMiddle + title_FIELDNAME)
@Column(length = length_255B, name = ColumnNamePrefix + TITLE_FIELDNAME)
@Index(name = TABLE + IndexNameMiddle + TITLE_FIELDNAME)
@CheckPersist(allowEmpty = true)
private String title;
public static final String body_FIELDNAME = "body";
public static final String BODY_FIELDNAME = "body";
@FieldDescribe("内容.")
@Lob
@Basic(fetch = FetchType.EAGER)
@Column(length = length_10M, name = ColumnNamePrefix + body_FIELDNAME)
@Column(length = length_10M, name = ColumnNamePrefix + BODY_FIELDNAME)
private String body;
public static final String type_FIELDNAME = "type";
public static final String TYPE_FIELDNAME = "type";
@FieldDescribe("消息类型.")
@Column(length = length_255B, name = ColumnNamePrefix + type_FIELDNAME)
@Index(name = TABLE + IndexNameMiddle + type_FIELDNAME)
@Column(length = length_255B, name = ColumnNamePrefix + TYPE_FIELDNAME)
@Index(name = TABLE + IndexNameMiddle + TYPE_FIELDNAME)
@CheckPersist(allowEmpty = false)
private String type;
public static final String consumer_FIELDNAME = "consumer";
public static final String CONSUMER_FIELDNAME = "consumer";
@FieldDescribe("消费者.")
@Column(length = length_255B, name = ColumnNamePrefix + consumer_FIELDNAME)
@Index(name = TABLE + IndexNameMiddle + consumer_FIELDNAME)
@Column(length = length_255B, name = ColumnNamePrefix + CONSUMER_FIELDNAME)
@Index(name = TABLE + IndexNameMiddle + CONSUMER_FIELDNAME)
@CheckPersist(allowEmpty = false)
private String consumer;
public static final String person_FIELDNAME = "person";
public static final String PERSON_FIELDNAME = "person";
@FieldDescribe("通知对象.")
@Column(length = length_255B, name = ColumnNamePrefix + person_FIELDNAME)
@Index(name = TABLE + IndexNameMiddle + person_FIELDNAME)
@Column(length = length_255B, name = ColumnNamePrefix + PERSON_FIELDNAME)
@Index(name = TABLE + IndexNameMiddle + PERSON_FIELDNAME)
@CheckPersist(allowEmpty = true)
private String person;
public static final String consumed_FIELDNAME = "consumed";
public static final String CONSUMED_FIELDNAME = "consumed";
@FieldDescribe("是否已经消费掉.")
@Column(name = ColumnNamePrefix + consumed_FIELDNAME)
@Index(name = TABLE + IndexNameMiddle + consumed_FIELDNAME)
@Column(name = ColumnNamePrefix + CONSUMED_FIELDNAME)
@Index(name = TABLE + IndexNameMiddle + CONSUMED_FIELDNAME)
@CheckPersist(allowEmpty = false)
private Boolean consumed;
public static final String instant_FIELDNAME = "instant";
public static final String INSTANT_FIELDNAME = "instant";
@FieldDescribe("主体消息id.")
@Column(length = JpaObject.length_id, name = ColumnNamePrefix + instant_FIELDNAME)
@Index(name = TABLE + IndexNameMiddle + instant_FIELDNAME)
@Column(length = JpaObject.length_id, name = ColumnNamePrefix + INSTANT_FIELDNAME)
@Index(name = TABLE + IndexNameMiddle + INSTANT_FIELDNAME)
@CheckPersist(allowEmpty = false)
private String instant;
public static final String ITEM_FIELDNAME = "item";
@FieldDescribe("配置项.")
@Column(length = JpaObject.length_255B, name = ColumnNamePrefix + ITEM_FIELDNAME)
@Index(name = TABLE + IndexNameMiddle + ITEM_FIELDNAME)
@CheckPersist(allowEmpty = false)
private String item;
public static final String PROPERTIES_FIELDNAME = "properties";
@FieldDescribe("属性对象存储字段.")
@Persistent
@Strategy(JsonPropertiesValueHandler)
@Column(length = JpaObject.length_10M, name = ColumnNamePrefix + PROPERTIES_FIELDNAME)
@CheckPersist(allowEmpty = true)
private MessageProperties properties;
public String getBody() {
return body;
}
......@@ -152,6 +180,14 @@ public class Message extends SliceJpaObject {
return instant;
}
public String getItem() {
return item;
}
public void setItem(String item) {
this.item = item;
}
public void setInstant(String instant) {
this.instant = instant;
}
......
package com.x.message.core.entity;
import com.x.base.core.entity.JsonProperties;
import com.x.base.core.project.annotation.FieldDescribe;
public class MessageProperties extends JsonProperties {
private static final long serialVersionUID = -4943657103264120657L;
@FieldDescribe("失败次数")
private Integer failure;
@FieldDescribe("错误信息")
private String error;
public Integer getFailure() {
return failure;
}
public void setFailure(Integer failure) {
this.failure = failure;
}
public String getError() {
return error;
}
public void setError(String error) {
this.error = error;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册