提交 176aba55 编写于 作者: hlwwx's avatar hlwwx

'增加发送到MQ,kafka,activeMQ'

上级 89f440e8
......@@ -9,9 +9,16 @@
"clean": {
"enable": true,
"cron": "30 30 6 * * ?",
"keep": 7.0,
"keep": 7,
"###enable": "是否启用###",
"###cron": "定时cron表达式###",
"###keep": "消息保留天数###"
},
"###cronMq": "定时触发发送到消息队列MQ.###",
"cronMq": {
"enable": true,
"cron": "0 */5 * * * ?",
"###enable": "是否启用###",
"###cron": "定时cron表达式###"
}
}
\ No newline at end of file
......@@ -9,6 +9,7 @@
"attachment_editor": {
"consumers": [],
"consumersV2": {
"mq": "",
"pms": "",
"zhengwuDingding": "",
"qiyeweixin": "",
......@@ -20,6 +21,7 @@
"attachment_editorCancel": {
"consumers": [],
"consumersV2": {
"mq": "",
"pms": "",
"zhengwuDingding": "",
"qiyeweixin": "",
......@@ -31,6 +33,7 @@
"attachment_editorModify": {
"consumers": [],
"consumersV2": {
"mq": "",
"pms": "",
"zhengwuDingding": "",
"qiyeweixin": "",
......@@ -42,6 +45,7 @@
"attachment_share": {
"consumers": [],
"consumersV2": {
"mq": "",
"pms": "",
"zhengwuDingding": "",
"qiyeweixin": "",
......@@ -53,6 +57,7 @@
"attachment_shareCancel": {
"consumers": [],
"consumersV2": {
"mq": "",
"pms": "",
"zhengwuDingding": "",
"qiyeweixin": "",
......@@ -95,6 +100,7 @@
"meeting_delete": {
"consumers": [],
"consumersV2": {
"mq": "",
"pms": "",
"zhengwuDingding": "",
"qiyeweixin": "",
......@@ -106,6 +112,7 @@
"meeting_invite": {
"consumers": [],
"consumersV2": {
"mq": "",
"pms": "",
"zhengwuDingding": "",
"qiyeweixin": "",
......@@ -139,6 +146,7 @@
"read_create": {
"consumers": [],
"consumersV2": {
"mq": "",
"pms": "",
"zhengwuDingding": "",
"qiyeweixin": "",
......@@ -162,6 +170,7 @@
"task_create": {
"consumers": [],
"consumersV2": {
"mq": "",
"pms": "",
"zhengwuDingding": "",
"qiyeweixin": "",
......@@ -177,6 +186,7 @@
"task_press": {
"consumers": [],
"consumersV2": {
"mq": "",
"pms": "",
"zhengwuDingding": "",
"qiyeweixin": "",
......
{
"enable": true,
"mq":"kafka",
"kafka":{
"bootstrap_servers": "localhost:9092",
"topic":"topic-test",
"acks": "all",
"retries": 0,
"batch_size": 16384,
"linger_ms": 1,
"buffer_memory": 33554432,
"key_deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"value_deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"###bootstrap_servers": "服务器地址###",
"###acks": "指定必须有多少个分区副本接收消息,生产者才认为消息写入成功,用户检测数据丢失的可能性###",
"###retries": "生产者从服务器收到的错误有可能是临时性的错误的次数###",
"###batch_size": "该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。###",
"###linger_ms": "该参数指定了生产者在发送批次之前等待更多消息加入批次的时间,增加延迟,提高吞吐量###",
"###buffer_memory": "该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息###",
"###key_deserializer": "key值的序列化类###",
"###value_deserializer": "value的序列化类###"
},
"activeMQ":{
"url":"tcp://127.0.0.1:61616",
"queueName":"queue-test",
"###url": "服务地址,端口默认61616.###",
"###queueName": "要创建的消息名称###"
},
"###enable": "是否启用.###",
"###mq": "消息服务类型.###"
}
\ No newline at end of file
......@@ -43,6 +43,51 @@ public class Communicate extends ConfigObject {
return BooleanUtils.isTrue(calendarEnable);
}
@FieldDescribe("定时触发发送到消息队列MQ.")
private CronMq cronMq;
public CronMq cronMq() {
return this.cronMq == null ? new CronMq() : this.cronMq;
}
public static class CronMq extends ConfigObject {
public static CronMq defaultInstance() {
CronMq o = new CronMq();
return o;
}
public final static Boolean DEFAULT_ENABLE = false;
public final static String DEFAULT_CRON = "0 0 * * * ? *"; //每小时运行一次
@FieldDescribe("是否启用")
private Boolean enable = DEFAULT_ENABLE;
@FieldDescribe("定时cron表达式")
private String cron = DEFAULT_CRON;
public String getCron() {
if (StringUtils.isNotEmpty(this.cron) && CronExpression.isValidExpression(this.cron)) {
return this.cron;
} else {
return DEFAULT_CRON;
}
}
public Boolean getEnable() {
return BooleanUtils.isTrue(this.enable);
}
public void setCron(String cron) {
this.cron = cron;
}
public void setEnable(Boolean enable) {
this.enable = enable;
}
}
@FieldDescribe("清理设置.")
private Clean clean;
......@@ -51,6 +96,7 @@ public class Communicate extends ConfigObject {
}
public static class Clean extends ConfigObject {
private static final long serialVersionUID = 1L;
public static Clean defaultInstance() {
Clean o = new Clean();
......
......@@ -64,6 +64,7 @@ public class Config {
public static final String PATH_CONFIG_WELINK = "config/welink.json";
public static final String PATH_CONFIG_ZHENGWUDINGDING = "config/zhengwuDingding.json";
public static final String PATH_CONFIG_QIYEWEIXIN = "config/qiyeweixin.json";
public static final String PATH_CONFIG_MQ = "config/mq.json";
public static final String PATH_CONFIG_LOGLEVEL = "config/logLevel.json";
public static final String PATH_CONFIG_BINDLOGO = "config/bindLogo.png";
public static final String PATH_CONFIG_SLICE = "config/slice.json";
......@@ -1095,6 +1096,23 @@ public class Config {
return instance().zhengwuDingding;
}
private MQ mq;
public static MQ mq() throws Exception {
if (null == instance().mq) {
synchronized (Config.class) {
if (null == instance().mq) {
MQ obj = BaseTools.readConfigObject(PATH_CONFIG_MQ, MQ.class);
if (null == obj) {
obj = MQ.defaultInstance();
}
instance().mq = obj;
}
}
}
return instance().mq;
}
private Vfs vfs;
public static Vfs vfs() throws Exception {
......
package com.x.base.core.project.config;
import java.io.File;
import java.util.Calendar;
import java.util.Date;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import com.x.base.core.project.annotation.FieldDescribe;
import com.x.base.core.project.connection.HttpConnection;
import com.x.base.core.project.gson.GsonPropertyObject;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.tools.DefaultCharset;
public class MQ extends ConfigObject {
@FieldDescribe("是否启用.")
private Boolean enable;
@FieldDescribe("消息服务类型")
private String mq;
@FieldDescribe("Kafka服务器配置")
private MQKafka kafka;
@FieldDescribe("ActiveMQ服务器配置")
private MQActive activeMQ;
public static MQ defaultInstance() {
return new MQ();
}
public static final Boolean default_enable = false;
public static final String default_mq = "kafka";
public MQ() {
this.enable = default_enable;
this.mq = default_mq;
}
public Boolean getEnable() {
return BooleanUtils.isTrue(this.enable);
}
public void setEnable(Boolean enable) {
this.enable = enable;
}
public String getMq() {
return mq;
}
public void setMq(String mq) {
this.mq = mq;
}
public MQKafka getKafka() {
return kafka;
}
public void setKafka(MQKafka kafka) {
this.kafka = kafka;
}
public MQActive getActiveMQ() {
return activeMQ;
}
public void setActiveMQ(MQActive activeMQ) {
this.activeMQ = activeMQ;
}
}
package com.x.base.core.project.config;
import java.io.File;
import java.util.Calendar;
import java.util.Date;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import com.x.base.core.project.annotation.FieldDescribe;
import com.x.base.core.project.connection.HttpConnection;
import com.x.base.core.project.gson.GsonPropertyObject;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.tools.DefaultCharset;
public class MQActive extends ConfigObject {
@FieldDescribe("服务器地址")
private String url;
@FieldDescribe("消息队列名")
private String queueName;
public static MQActive defaultInstance() {
return new MQActive();
}
public static final String default_url = "tcp://127.0.0.1:61616";
public static final String default_queueName = "queue-test";
public MQActive() {
this.url = default_url;
this.queueName = default_queueName;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
}
package com.x.base.core.project.config;
import java.io.File;
import java.util.Calendar;
import java.util.Date;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import com.x.base.core.project.annotation.FieldDescribe;
import com.x.base.core.project.connection.HttpConnection;
import com.x.base.core.project.gson.GsonPropertyObject;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.tools.DefaultCharset;
public class MQKafka extends ConfigObject {
@FieldDescribe("服务器地址")
private String bootstrap_servers;
@FieldDescribe("主题")
private String topic;
@FieldDescribe("指定必须有多少个分区副本接收消息,生产者才认为消息写入成功")
private String acks;
@FieldDescribe("错误的次数")
private Integer retries;
@FieldDescribe("批次可以使用的内存大小")
private Integer batch_size;
@FieldDescribe("等待更多消息加入批次的时间")
private Integer linger_ms;
@FieldDescribe("生产者内存缓冲区的大小")
private Integer buffer_memory;
@FieldDescribe("key值的序列化类")
private String key_deserializer;
@FieldDescribe("value的序列化类")
private String value_deserializer;
public static MQKafka defaultInstance() {
return new MQKafka();
}
public static final String default_bootstrap_servers = "localhost:9092";
public static final String default_topic = "topic-test";
public static final String default_acks = "all";
public static final Integer default_retries = 0;
public static final Integer default_batch_size = 16384;
public static final Integer default_linger_ms= 1;
public static final Integer default_buffer_memory = 33554432;
public static final String default_key_deserializer = "org.apache.kafka.common.serialization.StringDeserializer";
public static final String default_value_deserializer = "org.apache.kafka.common.serialization.StringDeserializer";
public MQKafka() {
this.bootstrap_servers = default_bootstrap_servers;
this.topic = default_topic;
this.acks = default_acks;
this.retries = default_retries;
this.batch_size = default_batch_size;
this.linger_ms = default_linger_ms;
this.buffer_memory= default_buffer_memory;
this.key_deserializer = default_key_deserializer;
this.value_deserializer = default_value_deserializer;
}
public String getBootstrap_servers() {
return StringUtils.isEmpty(bootstrap_servers) ? default_bootstrap_servers : this.bootstrap_servers;
}
public void setBootstrap_servers(String bootstrap_servers) {
this.bootstrap_servers = bootstrap_servers;
}
public String getTopic() {
return StringUtils.isEmpty(topic) ? default_topic : this.topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getAcks() {
return StringUtils.isEmpty(acks) ? default_acks : this.acks;
}
public void setAcks(String acks) {
this.acks = acks;
}
public Integer getRetries() {
return this.retries;
}
public void setRetries(Integer retries) {
this.retries = retries;
}
public Integer getBatch_size() {
return this.batch_size;
}
public void setBatch_size(Integer batch_size) {
this.batch_size = batch_size;
}
public Integer getLinger_ms() {
return this.linger_ms;
}
public void setLinger_ms(Integer linger_ms) {
this.linger_ms = linger_ms;
}
public Integer getBuffer_memory() {
return this.buffer_memory;
}
public void setBuffer_memory(Integer buffer_memory) {
this.buffer_memory = buffer_memory;
}
public String getKey_deserializer() {
return StringUtils.isEmpty(key_deserializer) ? default_key_deserializer : this.key_deserializer ;
}
public void setKey_deserializer(String key_deserializer) {
this.key_deserializer = key_deserializer;
}
public String getValue_deserializer() {
return StringUtils.isEmpty(value_deserializer) ? default_value_deserializer : this.value_deserializer ;
}
public void setValue_deserializer(String value_deserializer) {
this.value_deserializer = value_deserializer;
}
}
......@@ -28,51 +28,51 @@ public class Messages extends ConcurrentSkipListMap<String, Message> {
o.put(MessageConnector.TYPE_ATTACHMENT_SHARE,
new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
o.put(MessageConnector.TYPE_ATTACHMENT_EDITOR,
new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
o.put(MessageConnector.TYPE_ATTACHMENT_SHARECANCEL,
new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
o.put(MessageConnector.TYPE_ATTACHMENT_EDITORCANCEL,
new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
o.put(MessageConnector.TYPE_ATTACHMENT_EDITORMODIFY,
new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
/* 文件通知结束 */
/* 会议通知 */
o.put(MessageConnector.TYPE_MEETING_INVITE,
new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
o.put(MessageConnector.TYPE_MEETING_DELETE,
new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
/* 会议通知结束 */
/* 待办已办通知 */
o.put(MessageConnector.TYPE_TASK_CREATE,
new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
/* 待办提醒通知 */
o.put(MessageConnector.TYPE_TASK_PRESS,
new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
o.put(MessageConnector.TYPE_TASK_DELETE, new Message());
......@@ -85,7 +85,7 @@ public class Messages extends ConcurrentSkipListMap<String, Message> {
o.put(MessageConnector.TYPE_READ_CREATE,
new Message(MessageConnector.CONSUME_WS, MessageConnector.CONSUME_PMS,
MessageConnector.CONSUME_DINGDING, MessageConnector.CONSUME_ZHENGWUDINGDING,
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK));
MessageConnector.CONSUME_QIYEWEIXIN, MessageConnector.CONSUME_WELINK,MessageConnector.CONSUME_MQ));
o.put(MessageConnector.TYPE_READ_DELETE, new Message());
......
......@@ -135,6 +135,8 @@ public class MessageConnector {
public static final String CONSUME_QIYEWEIXIN = "qiyeweixin";
public static final String CONSUME_MQ = "mq";
private static Context context;
private static LinkedBlockingQueue<Wrap> connectQueue = new LinkedBlockingQueue<>();
......
......@@ -8,6 +8,7 @@ import javax.persistence.EntityManager;
import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Expression;
import javax.persistence.criteria.Order;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
......@@ -25,7 +26,6 @@ import com.x.base.core.project.http.ActionResult;
import com.x.base.core.project.http.EffectivePerson;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.base.core.project.tools.SortTools;
import com.x.meeting.assemble.control.Business;
import com.x.meeting.assemble.control.WrapTools;
import com.x.meeting.assemble.control.wrapout.WrapOutMeeting;
......@@ -51,8 +51,9 @@ class ActionPaging extends BaseAction {
Root<Meeting> root = cq.from(Meeting.class);
Predicate p = cb.equal(root.get(Meeting_.applicant), effectivePerson.getDistinguishedName());
p = cb.or(p, cb.isMember(effectivePerson.getDistinguishedName(),root.get(Meeting_.invitePersonList)));
//p = cb.or(p, cb.isMember(effectivePerson.getDistinguishedName(),root.get(Meeting_.invitePersonList)));
Expression<List<String>> expression = root.get(Meeting_.invitePersonList);
p = cb.or(p, expression.in(effectivePerson.getDistinguishedName()));
if(!StringUtils.isBlank(wi.getSubject())) {
p = cb.and(p, cb.like(root.get(Meeting_.subject), "%" + wi.getSubject() + "%"));
......@@ -96,6 +97,10 @@ class ActionPaging extends BaseAction {
p = cb.and(p, cb.equal(root.get(Meeting_.confirmStatus), ConfirmStatus.valueOf(wi.getConfirmStatus().trim())));
}
if(!StringUtils.isBlank(wi.getApplicant())) {
p = cb.and(p, cb.equal(root.get(Meeting_.applicant), wi.getApplicant()));
}
if(!StringUtils.isBlank(wi.getInvitePersonList())) {
p = cb.and(p, cb.isMember( wi.getInvitePersonList().trim(),root.get(Meeting_.invitePersonList)));
}
......@@ -127,28 +132,32 @@ class ActionPaging extends BaseAction {
}
cq.select(root.get(Meeting_.id)).where(p).orderBy(order);
cq.distinct(true);
TypedQuery<String> typedQuery = em.createQuery(cq);
int pageIndex = (page-1)*size;
int pageSize = page*size;
int pageSize = size;
typedQuery.setFirstResult(pageIndex);
typedQuery.setMaxResults(pageSize);
//logger.info("typedQuery="+ typedQuery.toString());
ids = typedQuery.getResultList();
CriteriaQuery<Long> cqCount = cb.createQuery(Long.class);
Root<Meeting> rootCount = cqCount.from(Meeting.class);
cqCount.select(cb.countDistinct(rootCount)).where(p);
Long count = em.createQuery(cqCount).getSingleResult().longValue();
//logger.info("count="+ count);
//logger.info("pagingtypedQuery="+ typedQuery.toString());
TypedQuery<String> tqCount = em.createQuery( cq.select(root.get(Meeting_.id)).where(p).distinct(true));
List<String> allid = tqCount.getResultList();
Long tpsize = (long) allid.size();
//logger.info("ids count="+ tpsize);
List<Wo> wos = Wo.copier.copy(emc.list(Meeting.class, ids));
CriteriaQuery<Meeting> cqMeeting = cb.createQuery(Meeting.class);
Predicate pMeeting = cb.isMember(root.get(Meeting_.id), cb.literal(ids));
Root<Meeting> rootMeeting = cqMeeting.from(Meeting.class);
cqMeeting.select(rootMeeting).where(pMeeting).orderBy(order);
List<Meeting> os = em.createQuery(cqMeeting).getResultList();
List<Wo> wos = Wo.copier.copy(os);
WrapTools.decorate(business, wos, effectivePerson);
WrapTools.setAttachment(business, wos);
SortTools.desc(wos, Meeting.startTime_FIELDNAME);
result.setData(wos);
result.setCount(count);
result.setCount(tpsize);
return result;
}
}
......@@ -178,6 +187,9 @@ class ActionPaging extends BaseAction {
@FieldDescribe("会议预定状态.(allow|deny|wait)")
private String confirmStatus;
@FieldDescribe("创建人员.")
private String applicant;
@FieldDescribe("邀请人员,身份,组织.")
private String invitePersonList;
......@@ -240,6 +252,15 @@ class ActionPaging extends BaseAction {
this.completedTime = completedTime;
}
public String getApplicant() {
return applicant;
}
public void setApplicant(String applicant) {
this.applicant = applicant;
}
public String getInvitePersonList() {
return invitePersonList;
}
......
......@@ -25,10 +25,8 @@ import com.x.base.core.project.http.ActionResult;
import com.x.base.core.project.http.EffectivePerson;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.base.core.project.tools.SortTools;
import com.x.meeting.assemble.control.Business;
import com.x.meeting.assemble.control.WrapTools;
import com.x.meeting.assemble.control.jaxrs.meeting.ActionPaging.Wo;
import com.x.meeting.assemble.control.wrapout.WrapOutMeeting;
import com.x.meeting.core.entity.ConfirmStatus;
import com.x.meeting.core.entity.Meeting;
......@@ -51,11 +49,14 @@ class ActionPagingManage extends BaseAction {
CriteriaQuery<String> cq = cb.createQuery(String.class);
Root<Meeting> root = cq.from(Meeting.class);
if (StringUtils.isBlank(wi.getDistinguishedName())) {
throw new ExceptionDistinguishedNameEmpty();
Predicate p = cb.isNotNull(root.get(Meeting_.applicant));
if (!StringUtils.isBlank(wi.getDistinguishedName())) {
//throw new ExceptionDistinguishedNameEmpty();
p = cb.and(p, cb.equal(root.get(Meeting_.applicant), wi.getDistinguishedName()));
}
Predicate p = cb.equal(root.get(Meeting_.applicant), wi.getDistinguishedName());
//Predicate p = cb.equal(root.get(Meeting_.applicant), wi.getDistinguishedName());
if(!StringUtils.isBlank(wi.getSubject())) {
p = cb.and(p, cb.like(root.get(Meeting_.subject), "%" + wi.getSubject() + "%"));
......@@ -99,6 +100,10 @@ class ActionPagingManage extends BaseAction {
p = cb.and(p, cb.equal(root.get(Meeting_.confirmStatus), ConfirmStatus.valueOf(wi.getConfirmStatus().trim())));
}
if(!StringUtils.isBlank(wi.getApplicant())) {
p = cb.and(p, cb.equal(root.get(Meeting_.applicant), wi.getApplicant()));
}
if(!StringUtils.isBlank(wi.getInvitePersonList())) {
p = cb.and(p, cb.isMember( wi.getInvitePersonList().trim(),root.get(Meeting_.invitePersonList)));
}
......@@ -129,31 +134,34 @@ class ActionPagingManage extends BaseAction {
order = cb.desc(root.get("startTime"));
}
cq.select(root.get(Meeting_.id)).where(p).orderBy(order);
cq.select(root.get(Meeting_.id)).where(p).orderBy(order);
cq.distinct(true);
TypedQuery<String> typedQuery = em.createQuery(cq);
int pageIndex = (page-1)*size;
int pageSize = page*size;
int pageSize = size;
typedQuery.setFirstResult(pageIndex);
typedQuery.setMaxResults(pageSize);
//logger.info("typedQuery="+ typedQuery.toString());
ids = typedQuery.getResultList();
CriteriaQuery<Long> cqCount = cb.createQuery(Long.class);
Root<Meeting> rootCount = cqCount.from(Meeting.class);
cqCount.select(cb.countDistinct(rootCount)).where(p);
Long count = em.createQuery(cqCount).getSingleResult().longValue();
// logger.info("count="+ count);
//logger.info("pagingtypedQuery="+ typedQuery.toString());
TypedQuery<String> tqCount = em.createQuery( cq.select(root.get(Meeting_.id)).where(p).distinct(true));
List<String> allid = tqCount.getResultList();
Long tpsize = (long) allid.size();
//logger.info("ids count="+ tpsize);
List<Wo> wos = Wo.copier.copy(emc.list(Meeting.class, ids));
CriteriaQuery<Meeting> cqMeeting = cb.createQuery(Meeting.class);
Predicate pMeeting = cb.isMember(root.get(Meeting_.id), cb.literal(ids));
Root<Meeting> rootMeeting = cqMeeting.from(Meeting.class);
cqMeeting.select(rootMeeting).where(pMeeting).orderBy(order);
List<Meeting> os = em.createQuery(cqMeeting).getResultList();
List<Wo> wos = Wo.copier.copy(os);
WrapTools.decorate(business, wos, effectivePerson);
WrapTools.setAttachment(business, wos);
SortTools.desc(wos, Meeting.startTime_FIELDNAME);
result.setData(wos);
result.setCount(count);
result.setCount(tpsize);
return result;
}
}
......@@ -185,6 +193,9 @@ class ActionPagingManage extends BaseAction {
@FieldDescribe("会议预定状态.(allow|deny|wait)")
private String confirmStatus;
@FieldDescribe("创建人员.")
private String applicant;
@FieldDescribe("邀请人员,身份,组织.")
private String invitePersonList;
......@@ -255,6 +266,15 @@ class ActionPagingManage extends BaseAction {
this.completedTime = completedTime;
}
public String getApplicant() {
return applicant;
}
public void setApplicant(String applicant) {
this.applicant = applicant;
}
public String getInvitePersonList() {
return invitePersonList;
}
......
......@@ -28,6 +28,35 @@
<groupId>o2oa</groupId>
<artifactId>x_message_core_entity</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<!--
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.14.5</version>
</dependency>
</dependencies>
<build>
<plugins>
......
package com.x.message.assemble.communicate;
import com.x.base.core.project.exception.PromptException;
class ExceptionMQMessage extends PromptException {
private static final long serialVersionUID = 4132300948670472899L;
ExceptionMQMessage(Integer retCode, String retMessage) {
super("发送消息队列失败,错误代码:{},错误消息:{}.", retCode, retMessage);
}
}
package com.x.message.assemble.communicate;
import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Order;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
import com.google.gson.*;
import com.x.base.core.container.EntityManagerContainer;
import com.x.base.core.container.factory.EntityManagerContainerFactory;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.base.core.project.message.MessageConnector;
import com.x.base.core.project.queue.AbstractQueue;
import com.x.message.assemble.communicate.mq.ActiveMQ;
import com.x.message.assemble.communicate.mq.KafkaMQ;
import com.x.message.assemble.communicate.mq.MQInterface;
import com.x.message.core.entity.Message;
import com.x.message.core.entity.Message_;
public class MQConsumeQueue extends AbstractQueue<Message> {
private static Logger logger = LoggerFactory.getLogger(MQConsumeQueue.class);
protected void execute(Message message) throws Exception {
logger.info("MQConsumeQueue message.getTitle:"+ message.getTitle());
if (Config.mq().getEnable()) {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
Business business = new Business(emc);
MQInterface MQClient;
EntityManager em = business.entityManagerContainer().get(Message.class);
CriteriaBuilder cb = em.getCriteriaBuilder();
CriteriaQuery<Message> cq = cb.createQuery(Message.class);
Root<Message> root = cq.from(Message.class);
Order order = cb.desc(root.get(Message_.createTime));
Predicate p = cb.notEqual(root.get(Message_.consumed), true);
p = cb.and(p, cb.equal(root.get(Message_.consumer), MessageConnector.CONSUME_MQ));
logger.info(p.toString());
List<Message> messages = em.createQuery(cq.select(root).where(p).orderBy(order)).setMaxResults(50).getResultList();
if(messages.size()>0) {
if(Config.mq().getMq().equalsIgnoreCase("kafka")) {
MQClient = KafkaMQ.getInstance();
}else {
MQClient = ActiveMQ.getInstance();
}
if(MQClient != null) {
for(Message mes : messages) {
boolean res = MQClient.sendMessage(mes);
if (res == false) {
Gson gson = new Gson();
String msg = gson.toJson(mes);
ExceptionMQMessage e = new ExceptionMQMessage(0, msg);
logger.error(e);
} else {
Message messageEntityObject = emc.find(mes.getId(), Message.class);
if (null != messageEntityObject) {
emc.beginTransaction(Message.class);
messageEntityObject.setConsumed(true);
emc.commit();
}
}
}
}
}
}
}
}
}
......@@ -7,6 +7,8 @@ import com.x.base.core.project.Context;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.message.assemble.communicate.schedule.Clean;
import com.x.message.assemble.communicate.schedule.TriggerMq;
import com.x.message.core.entity.Message;
public class ThisApplication {
......@@ -27,6 +29,8 @@ public class ThisApplication {
public static WeLinkConsumeQueue weLinkConsumeQueue = new WeLinkConsumeQueue();
public static PmsInnerConsumeQueue pmsInnerConsumeQueue = new PmsInnerConsumeQueue();
public static MQConsumeQueue mqConsumeQueue = new MQConsumeQueue();
public static Context context() {
return context;
......@@ -64,9 +68,18 @@ public class ThisApplication {
if (Config.weLink().getEnable() && Config.weLink().getMessageEnable()) {
weLinkConsumeQueue.start();
}
if (Config.mq().getEnable()) {
mqConsumeQueue.start();
}
MessageConnector.start(context());
if (BooleanUtils.isTrue(Config.communicate().cronMq().getEnable())) {
context().schedule(TriggerMq.class,Config.communicate().cronMq().getCron());
}
} catch (Exception e) {
e.printStackTrace();
}
......
......@@ -10,7 +10,6 @@ import com.x.base.core.container.EntityManagerContainer;
import com.x.base.core.container.factory.EntityManagerContainerFactory;
import com.x.base.core.entity.annotation.CheckPersistType;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.http.ActionResult;
import com.x.base.core.project.http.EffectivePerson;
import com.x.base.core.project.jaxrs.WrapBoolean;
......@@ -112,6 +111,9 @@ class ActionCreate extends BaseAction {
case MessageConnector.CONSUME_WELINK:
message = this.weLinkMessage(effectivePerson, business, cpwi, instant);
break;
case MessageConnector.CONSUME_MQ:
message = this.MQMessage(effectivePerson, business, cpwi, instant);
break;
default:
message = this.defaultMessage(effectivePerson, business, cpwi, consumer, instant);
break;
......@@ -173,6 +175,11 @@ class ActionCreate extends BaseAction {
ThisApplication.pmsInnerConsumeQueue.send(message);
}
break;
case MessageConnector.CONSUME_MQ:
if (Config.mq().getEnable()) {
ThisApplication.mqConsumeQueue.send(message);
}
break;
default:
break;
}
......@@ -290,6 +297,18 @@ class ActionCreate extends BaseAction {
return message;
}
private Message MQMessage(EffectivePerson effectivePerson, Business business, Wi wi, Instant instant) {
Message message = new Message();
message.setBody(Objects.toString(wi.getBody()));
message.setType(wi.getType());
message.setPerson(wi.getPerson());
message.setTitle(wi.getTitle());
message.setConsumer(MessageConnector.CONSUME_MQ);
message.setConsumed(false);
message.setInstant(instant.getId());
return message;
}
private Message defaultMessage(EffectivePerson effectivePerson, Business business, Wi wi, String consumer,
Instant instant) {
Message message = new Message();
......
package com.x.message.assemble.communicate.mq;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.google.gson.Gson;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.config.MQActive;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.message.core.entity.Message;
public class ActiveMQ implements MQInterface {
private static Logger logger = LoggerFactory.getLogger(ActiveMQ.class);
private Connection connection = null;
private MessageProducer producer = null;
private Session session = null;
private ActiveMQ() {
try {
MQActive configMQ = Config.mq().getActiveMQ();
logger.info("MqActive initialize.....");
String url=configMQ.getUrl();
String queueName=configMQ.getQueueName();
ConnectionFactory factory=new ActiveMQConnectionFactory(url);
this.connection= factory.createConnection();
this.connection.start();
this.session= this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination=session.createQueue(queueName);
this.producer = session.createProducer(destination);
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
}
}
private static class MQHolder{
private static ActiveMQ instance = new ActiveMQ();
}
public static ActiveMQ getInstance(){
return MQHolder.instance;
}
public static void main(String[] args) {
ActiveMQ MQClient = getInstance();
//System.out.println(MQClient.getTopic());
Message msg = new Message();
msg.setBody("body");
msg.setConsumed(false);
msg.setCreateTime(new Date());
msg.setPerson("person");
}
@Override
public boolean sendMessage(Message message) {
try {
Gson gson = new Gson();
String msg = gson.toJson(message);
TextMessage textMessage= this.session.createTextMessage(msg);
this.producer.send(textMessage);
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
return false;
} finally {
}
return true;
}
public void destroy() {
System.out.println("MqActive destroy.....");
try {
this.connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
logger.error(e);
}
}
}
\ No newline at end of file
package com.x.message.assemble.communicate.mq;
import java.util.Date;
import java.util.Properties;
import com.google.gson.Gson;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.config.MQ;
import com.x.base.core.project.config.MQKafka;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.message.core.entity.Message;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaMQ implements MQInterface {
private static Logger logger = LoggerFactory.getLogger(KafkaMQ.class);
private Producer<String, String> producer = null;
private String topic = "";
public Producer<String, String> getProducer() {
return producer;
}
public void setProducer(Producer<String, String> producer) {
this.producer = producer;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
private KafkaMQ() {
try {
MQKafka configMQ = Config.mq().getKafka();
logger.info("MQ initialize.....");
Properties properties = new Properties();
properties.put("bootstrap.servers", configMQ.getBootstrap_servers());
properties.put("acks", configMQ.getAcks());
properties.put("retries", configMQ.getRetries());
properties.put("batch.size", configMQ.getBatch_size());
properties.put("linger.ms", configMQ.getLinger_ms());
properties.put("buffer.memory", configMQ.getBuffer_memory());
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
this.producer = new KafkaProducer<String, String>(properties);
this.topic = configMQ.getTopic();
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
}
}
private static class MQHolder{
private static KafkaMQ instance = new KafkaMQ();
}
public static KafkaMQ getInstance(){
return MQHolder.instance;
}
public static void main(String[] args) {
KafkaMQ MQClient = getInstance();
Message msg = new Message();
msg.setBody("body");
msg.setConsumed(false);
msg.setCreateTime(new Date());
msg.setPerson("person");
System.out.println(MQClient.sendMessage(msg));
}
@Override
public boolean sendMessage(Message message) {
try {
Gson gson = new Gson();
String msg = gson.toJson(message);
this.producer.send(new ProducerRecord<String, String>(this.getTopic(), msg));
} catch (Exception e) {
e.printStackTrace();
logger.error(e);
return false;
} finally {
// this.producer.close();
}
return true;
}
public void destroy() {
this.producer.close();
}
}
package com.x.message.assemble.communicate.mq;
import com.x.message.core.entity.Message;
public interface MQInterface {
public boolean sendMessage(Message message);
}
package com.x.message.assemble.communicate.schedule;
import java.util.Date;
import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
import org.apache.commons.lang3.time.DateUtils;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import com.x.base.core.container.EntityManagerContainer;
import com.x.base.core.container.factory.EntityManagerContainerFactory;
import com.x.base.core.project.config.Config;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.base.core.project.message.MessageConnector;
import com.x.base.core.project.schedule.AbstractJob;
import com.x.base.core.project.tools.ListTools;
import com.x.base.core.project.utils.time.TimeStamp;
import com.x.message.assemble.communicate.Business;
import com.x.message.assemble.communicate.ThisApplication;
import com.x.message.core.entity.Instant;
import com.x.message.core.entity.Instant_;
import com.x.message.core.entity.Message;
import com.x.message.core.entity.Message_;
public class TriggerMq extends AbstractJob {
private static Logger logger = LoggerFactory.getLogger(Clean.class);
@Override
public void schedule(JobExecutionContext jobExecutionContext) throws Exception {
try (EntityManagerContainer emc = EntityManagerContainerFactory.instance().create()) {
if (Config.mq().getEnable()) {
Message message = new Message();
message.setBody("");
message.setType("TriggerMq");
message.setPerson("");
message.setTitle("TriggerMq");
message.setConsumer(MessageConnector.CONSUME_MQ);
message.setConsumed(false);
message.setInstant("");
ThisApplication.mqConsumeQueue.send(message);
}
} catch (Exception e) {
logger.error(e);
throw new JobExecutionException(e);
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册