diff --git a/o2server/configSample/communicate.json b/o2server/configSample/communicate.json index c43e0ad7baaf8ba2627ba40f3f827e88bf200c98..d0167a9e8c33d77f637843525886864e5554fa10 100644 --- a/o2server/configSample/communicate.json +++ b/o2server/configSample/communicate.json @@ -9,9 +9,16 @@ "clean": { "enable": true, "cron": "30 30 6 * * ?", - "keep": 7.0, + "keep": 7, "###enable": "是否启用###", "###cron": "定时cron表达式###", "###keep": "消息保留天数###" + }, + "###cronMq": "定时触发发送到消息队列MQ.###", + "cronMq": { + "enable": true, + "cron": "0 */5 * * * ?", + "###enable": "是否启用###", + "###cron": "定时cron表达式###" } } \ No newline at end of file diff --git a/o2server/configSample/messages.json b/o2server/configSample/messages.json index 323d2b51f1cf0798633ad3dab68af4c244e27466..4f68d92c29a42db9e5c1acc4f3ea3cd187a2b697 100644 --- a/o2server/configSample/messages.json +++ b/o2server/configSample/messages.json @@ -9,6 +9,7 @@ "attachment_editor": { "consumers": [], "consumersV2": { + "mq": "", "pms": "", "zhengwuDingding": "", "qiyeweixin": "", @@ -20,6 +21,7 @@ "attachment_editorCancel": { "consumers": [], "consumersV2": { + "mq": "", "pms": "", "zhengwuDingding": "", "qiyeweixin": "", @@ -31,6 +33,7 @@ "attachment_editorModify": { "consumers": [], "consumersV2": { + "mq": "", "pms": "", "zhengwuDingding": "", "qiyeweixin": "", @@ -42,6 +45,7 @@ "attachment_share": { "consumers": [], "consumersV2": { + "mq": "", "pms": "", "zhengwuDingding": "", "qiyeweixin": "", @@ -53,6 +57,7 @@ "attachment_shareCancel": { "consumers": [], "consumersV2": { + "mq": "", "pms": "", "zhengwuDingding": "", "qiyeweixin": "", @@ -95,6 +100,7 @@ "meeting_delete": { "consumers": [], "consumersV2": { + "mq": "", "pms": "", "zhengwuDingding": "", "qiyeweixin": "", @@ -106,6 +112,7 @@ "meeting_invite": { "consumers": [], "consumersV2": { + "mq": "", "pms": "", "zhengwuDingding": "", "qiyeweixin": "", @@ -139,6 +146,7 @@ "read_create": { "consumers": [], "consumersV2": { + "mq": "", "pms": "", "zhengwuDingding": "", "qiyeweixin": "", @@ -162,6 +170,7 @@ "task_create": { "consumers": [], "consumersV2": { + "mq": "", "pms": "", "zhengwuDingding": "", "qiyeweixin": "", @@ -177,6 +186,7 @@ "task_press": { "consumers": [], "consumersV2": { + "mq": "", "pms": "", "zhengwuDingding": "", "qiyeweixin": "", diff --git a/o2server/configSample/mq.json b/o2server/configSample/mq.json new file mode 100644 index 0000000000000000000000000000000000000000..9e6803243d21fcfe7c9c67f7c4260937b01c5187 --- /dev/null +++ b/o2server/configSample/mq.json @@ -0,0 +1,31 @@ +{ + "enable": true, + "mq":"kafka", + "kafka":{ + "bootstrap_servers": "localhost:9092", + "topic":"topic-test", + "acks": "all", + "retries": 0, + "batch_size": 16384, + "linger_ms": 1, + "buffer_memory": 33554432, + "key_deserializer": "org.apache.kafka.common.serialization.StringDeserializer", + "value_deserializer": "org.apache.kafka.common.serialization.StringDeserializer", + "###bootstrap_servers": "服务器地址###", + "###acks": "指定必须有多少个分区副本接收消息,生产者才认为消息写入成功,用户检测数据丢失的可能性###", + "###retries": "生产者从服务器收到的错误有可能是临时性的错误的次数###", + "###batch_size": "该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。###", + "###linger_ms": "该参数指定了生产者在发送批次之前等待更多消息加入批次的时间,增加延迟,提高吞吐量###", + "###buffer_memory": "该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息###", + "###key_deserializer": "key值的序列化类###", + "###value_deserializer": "value的序列化类###" + }, + "activeMQ":{ + "url":"tcp://127.0.0.1:61616", + "queueName":"queue-test", + "###url": "服务地址,端口默认61616.###", + "###queueName": "要创建的消息名称###" + }, + "###enable": "是否启用.###", + "###mq": "消息服务类型.###" +} \ No newline at end of file diff --git a/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Communicate.java b/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Communicate.java index c2a6fff518408b62465f7607e57c306f3d311503..e26c7d08f04184a26e34977da350106cd331263e 100644 --- a/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Communicate.java +++ b/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Communicate.java @@ -43,6 +43,51 @@ public class Communicate extends ConfigObject { return BooleanUtils.isTrue(calendarEnable); } + @FieldDescribe("定时触发发送到消息队列MQ.") + private CronMq cronMq; + + + public CronMq cronMq() { + return this.cronMq == null ? new CronMq() : this.cronMq; + } + + public static class CronMq extends ConfigObject { + + public static CronMq defaultInstance() { + CronMq o = new CronMq(); + return o; + } + + public final static Boolean DEFAULT_ENABLE = false; + public final static String DEFAULT_CRON = "0 0 * * * ? *"; //每小时运行一次 + + @FieldDescribe("是否启用") + private Boolean enable = DEFAULT_ENABLE; + + @FieldDescribe("定时cron表达式") + private String cron = DEFAULT_CRON; + + public String getCron() { + if (StringUtils.isNotEmpty(this.cron) && CronExpression.isValidExpression(this.cron)) { + return this.cron; + } else { + return DEFAULT_CRON; + } + } + + public Boolean getEnable() { + return BooleanUtils.isTrue(this.enable); + } + + public void setCron(String cron) { + this.cron = cron; + } + + public void setEnable(Boolean enable) { + this.enable = enable; + } + } + @FieldDescribe("清理设置.") private Clean clean; @@ -51,6 +96,7 @@ public class Communicate extends ConfigObject { } public static class Clean extends ConfigObject { + private static final long serialVersionUID = 1L; public static Clean defaultInstance() { Clean o = new Clean(); diff --git a/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Config.java b/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Config.java index b25cb5cf0f3f821eb673e4c260d17b5e274a9469..5efa44c6cea3017b2772c54cfcbbfa55e3da9262 100644 --- a/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Config.java +++ b/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Config.java @@ -64,6 +64,7 @@ public class Config { public static final String PATH_CONFIG_WELINK = "config/welink.json"; public static final String PATH_CONFIG_ZHENGWUDINGDING = "config/zhengwuDingding.json"; public static final String PATH_CONFIG_QIYEWEIXIN = "config/qiyeweixin.json"; + public static final String PATH_CONFIG_MQ = "config/mq.json"; public static final String PATH_CONFIG_LOGLEVEL = "config/logLevel.json"; public static final String PATH_CONFIG_BINDLOGO = "config/bindLogo.png"; public static final String PATH_CONFIG_SLICE = "config/slice.json"; @@ -1095,6 +1096,23 @@ public class Config { return instance().zhengwuDingding; } + private MQ mq; + + public static MQ mq() throws Exception { + if (null == instance().mq) { + synchronized (Config.class) { + if (null == instance().mq) { + MQ obj = BaseTools.readConfigObject(PATH_CONFIG_MQ, MQ.class); + if (null == obj) { + obj = MQ.defaultInstance(); + } + instance().mq = obj; + } + } + } + return instance().mq; + } + private Vfs vfs; public static Vfs vfs() throws Exception { diff --git a/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQ.java b/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQ.java new file mode 100644 index 0000000000000000000000000000000000000000..e5df1e406833d85904b74e5b7fad64dea08d9700 --- /dev/null +++ b/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQ.java @@ -0,0 +1,80 @@ +package com.x.base.core.project.config; + +import java.io.File; +import java.util.Calendar; +import java.util.Date; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; + +import com.x.base.core.project.annotation.FieldDescribe; +import com.x.base.core.project.connection.HttpConnection; +import com.x.base.core.project.gson.GsonPropertyObject; +import com.x.base.core.project.gson.XGsonBuilder; +import com.x.base.core.project.tools.DefaultCharset; + +public class MQ extends ConfigObject { + + @FieldDescribe("是否启用.") + private Boolean enable; + + @FieldDescribe("消息服务类型") + private String mq; + + @FieldDescribe("Kafka服务器配置") + private MQKafka kafka; + + @FieldDescribe("ActiveMQ服务器配置") + private MQActive activeMQ; + + public static MQ defaultInstance() { + return new MQ(); + } + + public static final Boolean default_enable = false; + public static final String default_mq = "kafka"; + + public MQ() { + this.enable = default_enable; + this.mq = default_mq; + + } + + public Boolean getEnable() { + return BooleanUtils.isTrue(this.enable); + } + + public void setEnable(Boolean enable) { + this.enable = enable; + } + + public String getMq() { + return mq; + } + + public void setMq(String mq) { + this.mq = mq; + } + + public MQKafka getKafka() { + return kafka; + } + + public void setKafka(MQKafka kafka) { + this.kafka = kafka; + } + + public MQActive getActiveMQ() { + return activeMQ; + } + + public void setActiveMQ(MQActive activeMQ) { + this.activeMQ = activeMQ; + } + + + + + +} diff --git a/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQActive.java b/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQActive.java new file mode 100644 index 0000000000000000000000000000000000000000..86e435044a03c578072fea5c8fd9d4166896a9ae --- /dev/null +++ b/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQActive.java @@ -0,0 +1,57 @@ +package com.x.base.core.project.config; + +import java.io.File; +import java.util.Calendar; +import java.util.Date; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; + +import com.x.base.core.project.annotation.FieldDescribe; +import com.x.base.core.project.connection.HttpConnection; +import com.x.base.core.project.gson.GsonPropertyObject; +import com.x.base.core.project.gson.XGsonBuilder; +import com.x.base.core.project.tools.DefaultCharset; + +public class MQActive extends ConfigObject { + + @FieldDescribe("服务器地址") + private String url; + + @FieldDescribe("消息队列名") + private String queueName; + + public static MQActive defaultInstance() { + return new MQActive(); + } + + public static final String default_url = "tcp://127.0.0.1:61616"; + public static final String default_queueName = "queue-test"; + + public MQActive() { + this.url = default_url; + this.queueName = default_queueName; + + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public String getQueueName() { + return queueName; + } + + public void setQueueName(String queueName) { + this.queueName = queueName; + } + + + + +} diff --git a/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQKafka.java b/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQKafka.java new file mode 100644 index 0000000000000000000000000000000000000000..1cf75da05d9307f507cc055368f55736eb18ba60 --- /dev/null +++ b/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/MQKafka.java @@ -0,0 +1,146 @@ +package com.x.base.core.project.config; + +import java.io.File; +import java.util.Calendar; +import java.util.Date; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; + +import com.x.base.core.project.annotation.FieldDescribe; +import com.x.base.core.project.connection.HttpConnection; +import com.x.base.core.project.gson.GsonPropertyObject; +import com.x.base.core.project.gson.XGsonBuilder; +import com.x.base.core.project.tools.DefaultCharset; + +public class MQKafka extends ConfigObject { + + @FieldDescribe("服务器地址") + private String bootstrap_servers; + + @FieldDescribe("主题") + private String topic; + + @FieldDescribe("指定必须有多少个分区副本接收消息,生产者才认为消息写入成功") + private String acks; + + @FieldDescribe("错误的次数") + private Integer retries; + + @FieldDescribe("批次可以使用的内存大小") + private Integer batch_size; + + @FieldDescribe("等待更多消息加入批次的时间") + private Integer linger_ms; + + @FieldDescribe("生产者内存缓冲区的大小") + private Integer buffer_memory; + + @FieldDescribe("key值的序列化类") + private String key_deserializer; + + @FieldDescribe("value的序列化类") + private String value_deserializer; + + public static MQKafka defaultInstance() { + return new MQKafka(); + } + + public static final String default_bootstrap_servers = "localhost:9092"; + public static final String default_topic = "topic-test"; + public static final String default_acks = "all"; + public static final Integer default_retries = 0; + public static final Integer default_batch_size = 16384; + public static final Integer default_linger_ms= 1; + public static final Integer default_buffer_memory = 33554432; + public static final String default_key_deserializer = "org.apache.kafka.common.serialization.StringDeserializer"; + public static final String default_value_deserializer = "org.apache.kafka.common.serialization.StringDeserializer"; + + public MQKafka() { + this.bootstrap_servers = default_bootstrap_servers; + this.topic = default_topic; + this.acks = default_acks; + this.retries = default_retries; + this.batch_size = default_batch_size; + this.linger_ms = default_linger_ms; + this.buffer_memory= default_buffer_memory; + this.key_deserializer = default_key_deserializer; + this.value_deserializer = default_value_deserializer; + } + + + public String getBootstrap_servers() { + return StringUtils.isEmpty(bootstrap_servers) ? default_bootstrap_servers : this.bootstrap_servers; + } + + public void setBootstrap_servers(String bootstrap_servers) { + this.bootstrap_servers = bootstrap_servers; + } + + public String getTopic() { + return StringUtils.isEmpty(topic) ? default_topic : this.topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getAcks() { + return StringUtils.isEmpty(acks) ? default_acks : this.acks; + + } + + public void setAcks(String acks) { + this.acks = acks; + } + + public Integer getRetries() { + return this.retries; + } + + public void setRetries(Integer retries) { + this.retries = retries; + } + + public Integer getBatch_size() { + return this.batch_size; + } + + public void setBatch_size(Integer batch_size) { + this.batch_size = batch_size; + } + + public Integer getLinger_ms() { + return this.linger_ms; + } + + public void setLinger_ms(Integer linger_ms) { + this.linger_ms = linger_ms; + } + + public Integer getBuffer_memory() { + return this.buffer_memory; + } + + public void setBuffer_memory(Integer buffer_memory) { + this.buffer_memory = buffer_memory; + } + + public String getKey_deserializer() { + return StringUtils.isEmpty(key_deserializer) ? default_key_deserializer : this.key_deserializer ; + } + + public void setKey_deserializer(String key_deserializer) { + this.key_deserializer = key_deserializer; + } + + public String getValue_deserializer() { + return StringUtils.isEmpty(value_deserializer) ? default_value_deserializer : this.value_deserializer ; + } + + public void setValue_deserializer(String value_deserializer) { + this.value_deserializer = value_deserializer; + } + +} diff --git a/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Messages.java b/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Messages.java index e79dd5f0c93f22fd7783b4186415ad855b171ad6..2e36b283b2dff51614b46ad5969832351ecef329 100644 --- a/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Messages.java +++ b/o2server/x_base_core_project/src/main/java/com/x/base/core/project/config/Messages.java @@ -28,51 +28,51 @@ public class Messages extends ConcurrentSkipListMap { o.put(MessageConnector.TYPE_ATTACHMENT_SHARE, new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS, MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING, - MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK)); + MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ)); o.put(MessageConnector.TYPE_ATTACHMENT_EDITOR, new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS, MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING, - MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK)); + MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ)); o.put(MessageConnector.TYPE_ATTACHMENT_SHARECANCEL, new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS, MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING, - MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK)); + MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ)); o.put(MessageConnector.TYPE_ATTACHMENT_EDITORCANCEL, new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS, MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING, - MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK)); + MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ)); o.put(MessageConnector.TYPE_ATTACHMENT_EDITORMODIFY, new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS, MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING, - MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK)); + MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ)); /* 文件通知结束 */ /* 会议通知 */ o.put(MessageConnector.TYPE_MEETING_INVITE, new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS, MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING, - MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK)); + MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ)); o.put(MessageConnector.TYPE_MEETING_DELETE, new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS, MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING, - MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK)); + MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ)); /* 会议通知结束 */ /* 待办已办通知 */ o.put(MessageConnector.TYPE_TASK_CREATE, new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS, MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING, - MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK)); + MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ)); /* 待办提醒通知 */ o.put(MessageConnector.TYPE_TASK_PRESS, new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS, MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING, - MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK)); + MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ)); o.put(MessageConnector.TYPE_TASK_DELETE, new Message()); @@ -85,7 +85,7 @@ public class Messages extends ConcurrentSkipListMap { o.put(MessageConnector.TYPE_READ_CREATE, new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS, MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING, - MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK)); + MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ)); o.put(MessageConnector.TYPE_READ_DELETE, new Message()); diff --git a/o2server/x_base_core_project/src/main/java/com/x/base/core/project/message/MessageConnector.java b/o2server/x_base_core_project/src/main/java/com/x/base/core/project/message/MessageConnector.java index 0eb0c8249846cb1f9a4445dcd3ca0a526b7fc7f6..0a4d109d46a663f8928fa90910d4a772b91fed13 100644 --- a/o2server/x_base_core_project/src/main/java/com/x/base/core/project/message/MessageConnector.java +++ b/o2server/x_base_core_project/src/main/java/com/x/base/core/project/message/MessageConnector.java @@ -135,6 +135,8 @@ public class MessageConnector { public static final String CONSUME_QIYEWEIXIN = "qiyeweixin"; + public static final String CONSUME_MQ = "mq"; + private static Context context; private static LinkedBlockingQueue connectQueue = new LinkedBlockingQueue<>(); diff --git a/o2server/x_meeting_assemble_control/src/main/java/com/x/meeting/assemble/control/jaxrs/meeting/ActionPaging.java b/o2server/x_meeting_assemble_control/src/main/java/com/x/meeting/assemble/control/jaxrs/meeting/ActionPaging.java index 4ab816c23f7453dc06d907434fc22bd06aa233eb..9226ae2dc36f653a5c12d7453e0837102a110d9a 100644 --- a/o2server/x_meeting_assemble_control/src/main/java/com/x/meeting/assemble/control/jaxrs/meeting/ActionPaging.java +++ b/o2server/x_meeting_assemble_control/src/main/java/com/x/meeting/assemble/control/jaxrs/meeting/ActionPaging.java @@ -8,6 +8,7 @@ import javax.persistence.EntityManager; import javax.persistence.TypedQuery; import javax.persistence.criteria.CriteriaBuilder; import javax.persistence.criteria.CriteriaQuery; +import javax.persistence.criteria.Expression; import javax.persistence.criteria.Order; import javax.persistence.criteria.Predicate; import javax.persistence.criteria.Root; @@ -25,7 +26,6 @@ import com.x.base.core.project.http.ActionResult; import com.x.base.core.project.http.EffectivePerson; import com.x.base.core.project.logger.Logger; import com.x.base.core.project.logger.LoggerFactory; -import com.x.base.core.project.tools.SortTools; import com.x.meeting.assemble.control.Business; import com.x.meeting.assemble.control.WrapTools; import com.x.meeting.assemble.control.wrapout.WrapOutMeeting; @@ -51,8 +51,9 @@ class ActionPaging extends BaseAction { Root root = cq.from(Meeting.class); Predicate p = cb.equal(root.get(Meeting_.applicant), effectivePerson.getDistinguishedName()); - - p = cb.or(p, cb.isMember(effectivePerson.getDistinguishedName(),root.get(Meeting_.invitePersonList))); + //p = cb.or(p, cb.isMember(effectivePerson.getDistinguishedName(),root.get(Meeting_.invitePersonList))); + Expression> expression = root.get(Meeting_.invitePersonList); + p = cb.or(p, expression.in(effectivePerson.getDistinguishedName())); if(!StringUtils.isBlank(wi.getSubject())) { p = cb.and(p, cb.like(root.get(Meeting_.subject), "%" + wi.getSubject() + "%")); @@ -96,6 +97,10 @@ class ActionPaging extends BaseAction { p = cb.and(p, cb.equal(root.get(Meeting_.confirmStatus), ConfirmStatus.valueOf(wi.getConfirmStatus().trim()))); } + if(!StringUtils.isBlank(wi.getApplicant())) { + p = cb.and(p, cb.equal(root.get(Meeting_.applicant), wi.getApplicant())); + } + if(!StringUtils.isBlank(wi.getInvitePersonList())) { p = cb.and(p, cb.isMember( wi.getInvitePersonList().trim(),root.get(Meeting_.invitePersonList))); } @@ -127,28 +132,32 @@ class ActionPaging extends BaseAction { } cq.select(root.get(Meeting_.id)).where(p).orderBy(order); - + cq.distinct(true); + TypedQuery typedQuery = em.createQuery(cq); int pageIndex = (page-1)*size; - int pageSize = page*size; + int pageSize = size; typedQuery.setFirstResult(pageIndex); typedQuery.setMaxResults(pageSize); - - //logger.info("typedQuery="+ typedQuery.toString()); ids = typedQuery.getResultList(); - - CriteriaQuery cqCount = cb.createQuery(Long.class); - Root rootCount = cqCount.from(Meeting.class); - cqCount.select(cb.countDistinct(rootCount)).where(p); - Long count = em.createQuery(cqCount).getSingleResult().longValue(); - //logger.info("count="+ count); + //logger.info("pagingtypedQuery="+ typedQuery.toString()); + + TypedQuery tqCount = em.createQuery( cq.select(root.get(Meeting_.id)).where(p).distinct(true)); + List allid = tqCount.getResultList(); + Long tpsize = (long) allid.size(); + //logger.info("ids count="+ tpsize); - List wos = Wo.copier.copy(emc.list(Meeting.class, ids)); + CriteriaQuery cqMeeting = cb.createQuery(Meeting.class); + Predicate pMeeting = cb.isMember(root.get(Meeting_.id), cb.literal(ids)); + Root rootMeeting = cqMeeting.from(Meeting.class); + cqMeeting.select(rootMeeting).where(pMeeting).orderBy(order); + List os = em.createQuery(cqMeeting).getResultList(); + + List wos = Wo.copier.copy(os); WrapTools.decorate(business, wos, effectivePerson); WrapTools.setAttachment(business, wos); - SortTools.desc(wos, Meeting.startTime_FIELDNAME); result.setData(wos); - result.setCount(count); + result.setCount(tpsize); return result; } } @@ -178,6 +187,9 @@ class ActionPaging extends BaseAction { @FieldDescribe("会议预定状态.(allow|deny|wait)") private String confirmStatus; + @FieldDescribe("创建人员.") + private String applicant; + @FieldDescribe("邀请人员,身份,组织.") private String invitePersonList; @@ -240,6 +252,15 @@ class ActionPaging extends BaseAction { this.completedTime = completedTime; } + public String getApplicant() { + return applicant; + } + + public void setApplicant(String applicant) { + this.applicant = applicant; + } + + public String getInvitePersonList() { return invitePersonList; } diff --git a/o2server/x_meeting_assemble_control/src/main/java/com/x/meeting/assemble/control/jaxrs/meeting/ActionPagingManage.java b/o2server/x_meeting_assemble_control/src/main/java/com/x/meeting/assemble/control/jaxrs/meeting/ActionPagingManage.java index 8792bd4f0966eea946b56482352e326d46c26d58..b2ddfe76529630c245a70f1ca9febd7f5779a2e6 100644 --- a/o2server/x_meeting_assemble_control/src/main/java/com/x/meeting/assemble/control/jaxrs/meeting/ActionPagingManage.java +++ b/o2server/x_meeting_assemble_control/src/main/java/com/x/meeting/assemble/control/jaxrs/meeting/ActionPagingManage.java @@ -25,10 +25,8 @@ import com.x.base.core.project.http.ActionResult; import com.x.base.core.project.http.EffectivePerson; import com.x.base.core.project.logger.Logger; import com.x.base.core.project.logger.LoggerFactory; -import com.x.base.core.project.tools.SortTools; import com.x.meeting.assemble.control.Business; import com.x.meeting.assemble.control.WrapTools; -import com.x.meeting.assemble.control.jaxrs.meeting.ActionPaging.Wo; import com.x.meeting.assemble.control.wrapout.WrapOutMeeting; import com.x.meeting.core.entity.ConfirmStatus; import com.x.meeting.core.entity.Meeting; @@ -51,11 +49,14 @@ class ActionPagingManage extends BaseAction { CriteriaQuery cq = cb.createQuery(String.class); Root root = cq.from(Meeting.class); - if (StringUtils.isBlank(wi.getDistinguishedName())) { - throw new ExceptionDistinguishedNameEmpty(); + Predicate p = cb.isNotNull(root.get(Meeting_.applicant)); + + if (!StringUtils.isBlank(wi.getDistinguishedName())) { + //throw new ExceptionDistinguishedNameEmpty(); + p = cb.and(p, cb.equal(root.get(Meeting_.applicant), wi.getDistinguishedName())); } - Predicate p = cb.equal(root.get(Meeting_.applicant), wi.getDistinguishedName()); + //Predicate p = cb.equal(root.get(Meeting_.applicant), wi.getDistinguishedName()); if(!StringUtils.isBlank(wi.getSubject())) { p = cb.and(p, cb.like(root.get(Meeting_.subject), "%" + wi.getSubject() + "%")); @@ -99,6 +100,10 @@ class ActionPagingManage extends BaseAction { p = cb.and(p, cb.equal(root.get(Meeting_.confirmStatus), ConfirmStatus.valueOf(wi.getConfirmStatus().trim()))); } + if(!StringUtils.isBlank(wi.getApplicant())) { + p = cb.and(p, cb.equal(root.get(Meeting_.applicant), wi.getApplicant())); + } + if(!StringUtils.isBlank(wi.getInvitePersonList())) { p = cb.and(p, cb.isMember( wi.getInvitePersonList().trim(),root.get(Meeting_.invitePersonList))); } @@ -129,31 +134,34 @@ class ActionPagingManage extends BaseAction { order = cb.desc(root.get("startTime")); } - cq.select(root.get(Meeting_.id)).where(p).orderBy(order); - + cq.select(root.get(Meeting_.id)).where(p).orderBy(order); + cq.distinct(true); + TypedQuery typedQuery = em.createQuery(cq); int pageIndex = (page-1)*size; - int pageSize = page*size; + int pageSize = size; typedQuery.setFirstResult(pageIndex); typedQuery.setMaxResults(pageSize); - - //logger.info("typedQuery="+ typedQuery.toString()); ids = typedQuery.getResultList(); - - CriteriaQuery cqCount = cb.createQuery(Long.class); - Root rootCount = cqCount.from(Meeting.class); - cqCount.select(cb.countDistinct(rootCount)).where(p); - Long count = em.createQuery(cqCount).getSingleResult().longValue(); - // logger.info("count="+ count); + //logger.info("pagingtypedQuery="+ typedQuery.toString()); + + TypedQuery tqCount = em.createQuery( cq.select(root.get(Meeting_.id)).where(p).distinct(true)); + List allid = tqCount.getResultList(); + Long tpsize = (long) allid.size(); + //logger.info("ids count="+ tpsize); - List wos = Wo.copier.copy(emc.list(Meeting.class, ids)); + CriteriaQuery cqMeeting = cb.createQuery(Meeting.class); + Predicate pMeeting = cb.isMember(root.get(Meeting_.id), cb.literal(ids)); + Root rootMeeting = cqMeeting.from(Meeting.class); + cqMeeting.select(rootMeeting).where(pMeeting).orderBy(order); + List os = em.createQuery(cqMeeting).getResultList(); + + List wos = Wo.copier.copy(os); WrapTools.decorate(business, wos, effectivePerson); WrapTools.setAttachment(business, wos); - SortTools.desc(wos, Meeting.startTime_FIELDNAME); result.setData(wos); - result.setCount(count); + result.setCount(tpsize); return result; - } } @@ -185,6 +193,9 @@ class ActionPagingManage extends BaseAction { @FieldDescribe("会议预定状态.(allow|deny|wait)") private String confirmStatus; + @FieldDescribe("创建人员.") + private String applicant; + @FieldDescribe("邀请人员,身份,组织.") private String invitePersonList; @@ -255,6 +266,15 @@ class ActionPagingManage extends BaseAction { this.completedTime = completedTime; } + public String getApplicant() { + return applicant; + } + + public void setApplicant(String applicant) { + this.applicant = applicant; + } + + public String getInvitePersonList() { return invitePersonList; } diff --git a/o2server/x_message_assemble_communicate/pom.xml b/o2server/x_message_assemble_communicate/pom.xml index e2ed39b5ba5b6037c18304667629f9be50922b23..c21e2d73b476a32e767c7062a037c693bdaa10d9 100644 --- a/o2server/x_message_assemble_communicate/pom.xml +++ b/o2server/x_message_assemble_communicate/pom.xml @@ -28,6 +28,35 @@ o2oa x_message_core_entity + + + org.apache.kafka + kafka-clients + 2.6.0 + + + + org.apache.activemq + activemq-client + 5.14.5 + + diff --git a/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/ExceptionMQMessage.java b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/ExceptionMQMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..100da90881164f1295d8b275d5bb937780bac463 --- /dev/null +++ b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/ExceptionMQMessage.java @@ -0,0 +1,12 @@ +package com.x.message.assemble.communicate; + +import com.x.base.core.project.exception.PromptException; + +class ExceptionMQMessage extends PromptException { + + private static final long serialVersionUID = 4132300948670472899L; + + ExceptionMQMessage(Integer retCode, String retMessage) { + super("发送消息队列失败,错误代码:{},错误消息:{}.", retCode, retMessage); + } +} diff --git a/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/MQConsumeQueue.java b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/MQConsumeQueue.java new file mode 100644 index 0000000000000000000000000000000000000000..2224b2cd93334eb090047e018b2a9b368ff7f445 --- /dev/null +++ b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/MQConsumeQueue.java @@ -0,0 +1,76 @@ +package com.x.message.assemble.communicate; + +import java.util.List; + +import javax.persistence.EntityManager; +import javax.persistence.criteria.CriteriaBuilder; +import javax.persistence.criteria.CriteriaQuery; +import javax.persistence.criteria.Order; +import javax.persistence.criteria.Predicate; +import javax.persistence.criteria.Root; + +import com.google.gson.*; +import com.x.base.core.container.EntityManagerContainer; +import com.x.base.core.container.factory.EntityManagerContainerFactory; +import com.x.base.core.project.config.Config; +import com.x.base.core.project.logger.Logger; +import com.x.base.core.project.logger.LoggerFactory; +import com.x.base.core.project.message.MessageConnector; +import com.x.base.core.project.queue.AbstractQueue; +import com.x.message.assemble.communicate.mq.ActiveMQ; +import com.x.message.assemble.communicate.mq.KafkaMQ; +import com.x.message.assemble.communicate.mq.MQInterface; +import com.x.message.core.entity.Message; +import com.x.message.core.entity.Message_; + + +public class MQConsumeQueue extends AbstractQueue { + + private static Logger logger = LoggerFactory.getLogger(MQConsumeQueue.class); + + protected void execute(Message message) throws Exception { + logger.info("MQConsumeQueue message.getTitle:"+ message.getTitle()); + if (Config.mq().getEnable()) { + try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) { + Business business = new Business(emc); + MQInterface MQClient; + EntityManager em = business.entityManagerContainer().get(Message.class); + CriteriaBuilder cb = em.getCriteriaBuilder(); + CriteriaQuery cq = cb.createQuery(Message.class); + Root root = cq.from(Message.class); + + Order order = cb.desc(root.get(Message_.createTime)); + Predicate p = cb.notEqual(root.get(Message_.consumed), true); + + p = cb.and(p, cb.equal(root.get(Message_.consumer), MessageConnector.CONSUME_MQ)); + logger.info(p.toString()); + List messages = em.createQuery(cq.select(root).where(p).orderBy(order)).setMaxResults(50).getResultList(); + if(messages.size()>0) { + if(Config.mq().getMq().equalsIgnoreCase("kafka")) { + MQClient = KafkaMQ.getInstance(); + }else { + MQClient = ActiveMQ.getInstance(); + } + if(MQClient != null) { + for(Message mes : messages) { + boolean res = MQClient.sendMessage(mes); + if (res == false) { + Gson gson = new Gson(); + String msg = gson.toJson(mes); + ExceptionMQMessage e = new ExceptionMQMessage(0, msg); + logger.error(e); + } else { + Message messageEntityObject = emc.find(mes.getId(), Message.class); + if (null != messageEntityObject) { + emc.beginTransaction(Message.class); + messageEntityObject.setConsumed(true); + emc.commit(); + } + } + } + } + } + } + } + } +} diff --git a/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/ThisApplication.java b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/ThisApplication.java index e42452dad09bdd64e98fc7518638326bff8ba918..b6f767bc398c8497e98da2d62d69ce28a853bd54 100644 --- a/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/ThisApplication.java +++ b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/ThisApplication.java @@ -7,6 +7,8 @@ import com.x.base.core.project.Context; import com.x.base.core.project.config.Config; import com.x.base.core.project.logger.LoggerFactory; import com.x.message.assemble.communicate.schedule.Clean; +import com.x.message.assemble.communicate.schedule.TriggerMq; +import com.x.message.core.entity.Message; public class ThisApplication { @@ -27,6 +29,8 @@ public class ThisApplication { public static WeLinkConsumeQueue weLinkConsumeQueue = new WeLinkConsumeQueue(); public static PmsInnerConsumeQueue pmsInnerConsumeQueue = new PmsInnerConsumeQueue(); + + public static MQConsumeQueue mqConsumeQueue = new MQConsumeQueue(); public static Context context() { return context; @@ -64,9 +68,18 @@ public class ThisApplication { if (Config.weLink().getEnable() && Config.weLink().getMessageEnable()) { weLinkConsumeQueue.start(); } - + + if (Config.mq().getEnable()) { + mqConsumeQueue.start(); + } + MessageConnector.start(context()); - + + + if (BooleanUtils.isTrue(Config.communicate().cronMq().getEnable())) { + context().schedule(TriggerMq.class,Config.communicate().cronMq().getCron()); + } + } catch (Exception e) { e.printStackTrace(); } diff --git a/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/jaxrs/connector/ActionCreate.java b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/jaxrs/connector/ActionCreate.java index 01031038d94020b044fa503bc0a288f08cae66f1..79ae6e9837fa92ad6154e21c4195cdc8818c9881 100644 --- a/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/jaxrs/connector/ActionCreate.java +++ b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/jaxrs/connector/ActionCreate.java @@ -10,7 +10,6 @@ import com.x.base.core.container.EntityManagerContainer; import com.x.base.core.container.factory.EntityManagerContainerFactory; import com.x.base.core.entity.annotation.CheckPersistType; import com.x.base.core.project.config.Config; -import com.x.base.core.project.gson.XGsonBuilder; import com.x.base.core.project.http.ActionResult; import com.x.base.core.project.http.EffectivePerson; import com.x.base.core.project.jaxrs.WrapBoolean; @@ -112,6 +111,9 @@ class ActionCreate extends BaseAction { case MessageConnector.CONSUME_WELINK: message = this.weLinkMessage(effectivePerson, business, cpwi, instant); break; + case MessageConnector.CONSUME_MQ: + message = this.MQMessage(effectivePerson, business, cpwi, instant); + break; default: message = this.defaultMessage(effectivePerson, business, cpwi, consumer, instant); break; @@ -173,6 +175,11 @@ class ActionCreate extends BaseAction { ThisApplication.pmsInnerConsumeQueue.send(message); } break; + case MessageConnector.CONSUME_MQ: + if (Config.mq().getEnable()) { + ThisApplication.mqConsumeQueue.send(message); + } + break; default: break; } @@ -290,6 +297,18 @@ class ActionCreate extends BaseAction { return message; } + private Message MQMessage(EffectivePerson effectivePerson, Business business, Wi wi, Instant instant) { + Message message = new Message(); + message.setBody(Objects.toString(wi.getBody())); + message.setType(wi.getType()); + message.setPerson(wi.getPerson()); + message.setTitle(wi.getTitle()); + message.setConsumer(MessageConnector.CONSUME_MQ); + message.setConsumed(false); + message.setInstant(instant.getId()); + return message; + } + private Message defaultMessage(EffectivePerson effectivePerson, Business business, Wi wi, String consumer, Instant instant) { Message message = new Message(); diff --git a/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/ActiveMQ.java b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/ActiveMQ.java new file mode 100644 index 0000000000000000000000000000000000000000..f4ea4493ef88900ce62020cc47b56d337341f3ff --- /dev/null +++ b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/ActiveMQ.java @@ -0,0 +1,95 @@ +package com.x.message.assemble.communicate.mq; + +import java.util.Date; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import com.google.gson.Gson; +import com.x.base.core.project.config.Config; +import com.x.base.core.project.config.MQActive; +import com.x.base.core.project.logger.Logger; +import com.x.base.core.project.logger.LoggerFactory; +import com.x.message.core.entity.Message; + +public class ActiveMQ implements MQInterface { + + private static Logger logger = LoggerFactory.getLogger(ActiveMQ.class); + private Connection connection = null; + private MessageProducer producer = null; + private Session session = null; + + private ActiveMQ() { + try { + MQActive configMQ = Config.mq().getActiveMQ(); + logger.info("MqActive initialize....."); + + String url=configMQ.getUrl(); + String queueName=configMQ.getQueueName(); + + ConnectionFactory factory=new ActiveMQConnectionFactory(url); + this.connection= factory.createConnection(); + this.connection.start(); + this.session= this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination=session.createQueue(queueName); + this.producer = session.createProducer(destination); + } catch (Exception e) { + e.printStackTrace(); + logger.error(e); + } + + } + + private static class MQHolder{ + private static ActiveMQ instance = new ActiveMQ(); + } + + + public static ActiveMQ getInstance(){ + return MQHolder.instance; + } + + + public static void main(String[] args) { + ActiveMQ MQClient = getInstance(); + //System.out.println(MQClient.getTopic()); + Message msg = new Message(); + msg.setBody("body"); + msg.setConsumed(false); + msg.setCreateTime(new Date()); + msg.setPerson("person"); + } + + @Override + public boolean sendMessage(Message message) { + try { + Gson gson = new Gson(); + String msg = gson.toJson(message); + TextMessage textMessage= this.session.createTextMessage(msg); + this.producer.send(textMessage); + } catch (Exception e) { + e.printStackTrace(); + logger.error(e); + return false; + } finally { + + } + return true; + } + + public void destroy() { + System.out.println("MqActive destroy....."); + try { + this.connection.close(); + } catch (JMSException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + logger.error(e); + } + } +} \ No newline at end of file diff --git a/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/KafkaMQ.java b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/KafkaMQ.java new file mode 100644 index 0000000000000000000000000000000000000000..36f693704a1a28a44a94b1b23fba059165e40756 --- /dev/null +++ b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/KafkaMQ.java @@ -0,0 +1,105 @@ +package com.x.message.assemble.communicate.mq; + +import java.util.Date; +import java.util.Properties; + +import com.google.gson.Gson; +import com.x.base.core.project.config.Config; +import com.x.base.core.project.config.MQ; +import com.x.base.core.project.config.MQKafka; +import com.x.base.core.project.logger.Logger; +import com.x.base.core.project.logger.LoggerFactory; +import com.x.message.core.entity.Message; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; + +public class KafkaMQ implements MQInterface { + + private static Logger logger = LoggerFactory.getLogger(KafkaMQ.class); + + private Producer producer = null; + private String topic = ""; + + public Producer getProducer() { + return producer; + } + + public void setProducer(Producer producer) { + this.producer = producer; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + private KafkaMQ() { + try { + MQKafka configMQ = Config.mq().getKafka(); + logger.info("MQ initialize....."); + Properties properties = new Properties(); + properties.put("bootstrap.servers", configMQ.getBootstrap_servers()); + properties.put("acks", configMQ.getAcks()); + properties.put("retries", configMQ.getRetries()); + properties.put("batch.size", configMQ.getBatch_size()); + properties.put("linger.ms", configMQ.getLinger_ms()); + properties.put("buffer.memory", configMQ.getBuffer_memory()); + + properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + this.producer = new KafkaProducer(properties); + + this.topic = configMQ.getTopic(); + + } catch (Exception e) { + e.printStackTrace(); + logger.error(e); + } + + } + + private static class MQHolder{ + private static KafkaMQ instance = new KafkaMQ(); + } + + + public static KafkaMQ getInstance(){ + return MQHolder.instance; + } + + public static void main(String[] args) { + KafkaMQ MQClient = getInstance(); + Message msg = new Message(); + msg.setBody("body"); + msg.setConsumed(false); + msg.setCreateTime(new Date()); + msg.setPerson("person"); + System.out.println(MQClient.sendMessage(msg)); + } + + @Override + public boolean sendMessage(Message message) { + try { + Gson gson = new Gson(); + String msg = gson.toJson(message); + this.producer.send(new ProducerRecord(this.getTopic(), msg)); + } catch (Exception e) { + e.printStackTrace(); + logger.error(e); + return false; + } finally { + // this.producer.close(); + } + return true; + } + + public void destroy() { + this.producer.close(); + } +} diff --git a/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/MQInterface.java b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/MQInterface.java new file mode 100644 index 0000000000000000000000000000000000000000..fd9f8b5747dec80a72d39fc27e82498692025370 --- /dev/null +++ b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/mq/MQInterface.java @@ -0,0 +1,7 @@ +package com.x.message.assemble.communicate.mq; + +import com.x.message.core.entity.Message; + +public interface MQInterface { + public boolean sendMessage(Message message); +} diff --git a/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/schedule/TriggerMq.java b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/schedule/TriggerMq.java new file mode 100644 index 0000000000000000000000000000000000000000..377bde22c6ec1b78e518182fd0309812c71e1ef0 --- /dev/null +++ b/o2server/x_message_assemble_communicate/src/main/java/com/x/message/assemble/communicate/schedule/TriggerMq.java @@ -0,0 +1,58 @@ +package com.x.message.assemble.communicate.schedule; + +import java.util.Date; +import java.util.List; +import javax.persistence.EntityManager; +import javax.persistence.criteria.CriteriaBuilder; +import javax.persistence.criteria.CriteriaQuery; +import javax.persistence.criteria.Predicate; +import javax.persistence.criteria.Root; + +import org.apache.commons.lang3.time.DateUtils; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +import com.x.base.core.container.EntityManagerContainer; +import com.x.base.core.container.factory.EntityManagerContainerFactory; +import com.x.base.core.project.config.Config; +import com.x.base.core.project.logger.Logger; +import com.x.base.core.project.logger.LoggerFactory; +import com.x.base.core.project.message.MessageConnector; +import com.x.base.core.project.schedule.AbstractJob; +import com.x.base.core.project.tools.ListTools; +import com.x.base.core.project.utils.time.TimeStamp; +import com.x.message.assemble.communicate.Business; +import com.x.message.assemble.communicate.ThisApplication; +import com.x.message.core.entity.Instant; +import com.x.message.core.entity.Instant_; +import com.x.message.core.entity.Message; +import com.x.message.core.entity.Message_; + + +public class TriggerMq extends AbstractJob { + + private static Logger logger = LoggerFactory.getLogger(Clean.class); + + @Override + public void schedule(JobExecutionContext jobExecutionContext) throws Exception { + try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) { + + if (Config.mq().getEnable()) { + Message message = new Message(); + message.setBody(""); + message.setType("TriggerMq"); + message.setPerson(""); + message.setTitle("TriggerMq"); + message.setConsumer(MessageConnector.CONSUME_MQ); + message.setConsumed(false); + message.setInstant(""); + ThisApplication.mqConsumeQueue.send(message); + } + + } catch (Exception e) { + logger.error(e); + throw new JobExecutionException(e); + } + } + +} \ No newline at end of file