提交 6b97dc22 编写于 作者: R Ray

update wsClients

上级 663ece3d
package com.x.message.assemble.communicate;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.websocket.Session;
import org.apache.commons.lang3.BooleanUtils;
import com.x.base.core.project.Context;
......@@ -47,6 +52,12 @@ public class ThisApplication {
public static final TableConsumeQueue tableConsumeQueue = new TableConsumeQueue();
private static final Map<Session, String> WSCLIENTS = new ConcurrentHashMap<>();
public static Map<Session, String> wsClients() {
return WSCLIENTS;
}
public static Context context() {
return context;
}
......
......@@ -566,7 +566,7 @@ class ActionCreate extends BaseAction {
case MessageConnector.CONSUME_CALENDAR:
message = this.v3CalendarMessage(wi, consumer);
break;
// restful, mq, api, mail, jdbc, custom_消息没有其他判断条件
// restful, mq, api, mail, jdbc, table, custom_消息没有其他判断条件
default:
message = this.v3Message(wi, consumer);
break;
......@@ -579,7 +579,7 @@ class ActionCreate extends BaseAction {
private Message v3Message(Wi wi, Consumer consumer) {
Message message = new Message();
message.setBody(Objects.toString(v3load(wi, consumer)));
message.setBody(Objects.toString(v3Load(wi, consumer)));
message.setType(wi.getType());
message.setPerson(wi.getPerson());
message.setTitle(wi.getTitle());
......@@ -654,8 +654,7 @@ class ActionCreate extends BaseAction {
Message message = null;
try {
if (BooleanUtils.isTrue(Config.qiyeweixin().getEnable())
&& BooleanUtils.isTrue(Config.qiyeweixin().getMessageEnable())
&& BooleanUtils.isTrue(v3Filter(wi, consumer))) {
&& BooleanUtils.isTrue(Config.qiyeweixin().getMessageEnable())) {
message = v3Message(wi, consumer);
}
} catch (Exception e) {
......@@ -733,7 +732,7 @@ class ActionCreate extends BaseAction {
return true;
}
private JsonElement v3load(Wi wi, Consumer consumer) {
private JsonElement v3Load(Wi wi, Consumer consumer) {
JsonElement jsonElement = wi.getBody();
try {
if (StringUtils.isNotBlank(consumer.getLoader())) {
......
......@@ -20,7 +20,6 @@ import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.base.core.project.message.MessageConnector;
import com.x.message.assemble.communicate.ThisApplication;
import com.x.message.assemble.communicate.ws.collaboration.ActionCollaboration;
import com.x.message.core.entity.IMConversation;
import com.x.message.core.entity.IMMsg;
import com.x.message.core.entity.Message;
......@@ -73,7 +72,7 @@ public class ActionMsgCreate extends BaseAction {
MessageConnector.send(MessageConnector.TYPE_IM_CREATE, title, person, msg);
// 如果消息接收者没有在线 连接ws 就发送一个推送消息
try {
if (!ActionCollaboration.clients.containsValue(person)) {
if (!ThisApplication.wsClients().containsValue(person)) {
LOGGER.info("向app 推送im消息, person: " + person);
Message message = new Message();
String body = imMessageBody(msg);
......
......@@ -13,7 +13,7 @@ import com.x.base.core.project.jaxrs.WrapBoolean;
import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.base.core.project.message.WsMessage;
import com.x.message.assemble.communicate.ws.collaboration.ActionCollaboration;
import com.x.message.assemble.communicate.ThisApplication;
class ActionCreate extends BaseAction {
......@@ -28,7 +28,7 @@ class ActionCreate extends BaseAction {
Wo wo = new Wo();
wo.setValue(false);
for (Entry<Session, String> entry : ActionCollaboration.clients.entrySet()) {
for (Entry<Session, String> entry : ThisApplication.wsClients().entrySet()) {
if (StringUtils.equals(entry.getValue(), wi.getPerson())) {
Session session = entry.getKey();
if (session != null && session.isOpen()) {
......
......@@ -3,7 +3,6 @@ package com.x.message.assemble.communicate.ws.collaboration;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import javax.persistence.EntityManager;
import javax.persistence.criteria.CriteriaBuilder;
......@@ -23,6 +22,7 @@ import org.apache.commons.lang3.StringUtils;
import com.google.gson.JsonElement;
import com.x.base.core.container.EntityManagerContainer;
import com.x.base.core.container.factory.EntityManagerContainerFactory;
import com.x.base.core.entity.JpaObject_;
import com.x.base.core.project.gson.XGsonBuilder;
import com.x.base.core.project.http.EffectivePerson;
import com.x.base.core.project.http.HttpToken;
......@@ -31,25 +31,24 @@ import com.x.base.core.project.logger.Logger;
import com.x.base.core.project.logger.LoggerFactory;
import com.x.base.core.project.message.MessageConnector;
import com.x.base.core.project.message.WsMessage;
import com.x.message.assemble.communicate.ThisApplication;
import com.x.message.core.entity.Message;
import com.x.message.core.entity.Message_;
@ServerEndpoint(value = "/ws/collaboration", configurator = WsConfigurator.class)
public class ActionCollaboration {
private static Logger logger = LoggerFactory.getLogger(ActionCollaboration.class);
public static final ConcurrentHashMap<Session, String> clients = new ConcurrentHashMap<Session, String>();
private static final Logger LOGGER = LoggerFactory.getLogger(ActionCollaboration.class);
@OnOpen
public void open(Session session) {
EffectivePerson effectivePerson = (EffectivePerson) session.getUserProperties().get(HttpToken.X_Person);
logger.debug("@OnOpen: tokenType:{}, distinguishedName:{}.", effectivePerson.getTokenType(),
effectivePerson.getDistinguishedName());
if (TokenType.anonymous.equals(effectivePerson.getTokenType())) {
return;
} else {
clients.put(session, effectivePerson.getDistinguishedName());
LOGGER.debug("@OnOpen: tokenType:{}, distinguishedName:{}.", effectivePerson::getTokenType,
effectivePerson::getDistinguishedName);
if (!TokenType.anonymous.equals(effectivePerson.getTokenType())) {
ThisApplication.wsClients().put(session, effectivePerson.getDistinguishedName());
try {
List<Message> messages = this.load(effectivePerson);
WsMessage ws = null;
......@@ -63,26 +62,27 @@ public class ActionCollaboration {
session.getBasicRemote().sendText(XGsonBuilder.toJson(ws));
}
} catch (Exception e) {
logger.error(e);
LOGGER.error(e);
}
}
}
@OnClose
public void close(Session session, CloseReason reason) throws IOException {
clients.remove(session);
public void close(Session session, CloseReason reason) {
ThisApplication.wsClients().remove(session);
}
@OnError
public void error(Throwable t) throws Throwable {
public void error(Throwable t) {
// nothing
}
@OnMessage
public void message(String input, Session session) throws Exception {
public void message(String input, Session session) throws IOException {
EffectivePerson effectivePerson = (EffectivePerson) session.getUserProperties().get(HttpToken.X_Person);
logger.debug("@OnMessage1 receive: message {}, person:{}, ip:{}, client:{} .", input,
effectivePerson.getDistinguishedName(), effectivePerson.getRemoteAddress(), effectivePerson.getUserAgent());
LOGGER.debug("@OnMessage receive: message {}, person:{}, ip:{}, client:{} .", () -> input,
effectivePerson::getDistinguishedName, effectivePerson::getRemoteAddress,
effectivePerson::getUserAgent);
if (StringUtils.isBlank(input)) {
return;
}
......@@ -102,7 +102,7 @@ public class ActionCollaboration {
Predicate p = cb.equal(root.get(Message_.person), effectivePerson.getDistinguishedName());
p = cb.and(p, cb.equal(root.get(Message_.consumer), MessageConnector.CONSUME_WS));
p = cb.and(p, cb.equal(root.get(Message_.consumed), false));
cq.select(root).where(p).orderBy(cb.desc(root.get(Message_.createTime)));
cq.select(root).where(p).orderBy(cb.desc(root.get(JpaObject_.createTime)));
os = em.createQuery(cq).setMaxResults(10).getResultList();
emc.beginTransaction(Message.class);
for (Message o : os) {
......@@ -110,7 +110,7 @@ public class ActionCollaboration {
}
emc.commit();
} catch (Exception e) {
logger.error(e);
LOGGER.error(e);
}
return os;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册