提交 ac83c953 编写于 作者: N Nikita Koksharov

Feature - broadcastSessionUpdates setting added to Tomcat Session Manager. #3596

上级 ee20d528
...@@ -71,14 +71,16 @@ public class RedissonSession extends StandardSession { ...@@ -71,14 +71,16 @@ public class RedissonSession extends StandardSession {
private Set<String> removedAttributes = Collections.emptySet(); private Set<String> removedAttributes = Collections.emptySet();
private final boolean broadcastSessionEvents; private final boolean broadcastSessionEvents;
private final boolean broadcastSessionUpdates;
public RedissonSession(RedissonSessionManager manager, ReadMode readMode, UpdateMode updateMode, boolean broadcastSessionEvents) { public RedissonSession(RedissonSessionManager manager, ReadMode readMode, UpdateMode updateMode, boolean broadcastSessionEvents, boolean broadcastSessionUpdates) {
super(manager); super(manager);
this.redissonManager = manager; this.redissonManager = manager;
this.readMode = readMode; this.readMode = readMode;
this.updateMode = updateMode; this.updateMode = updateMode;
this.topic = redissonManager.getTopic(); this.topic = redissonManager.getTopic();
this.broadcastSessionEvents = broadcastSessionEvents; this.broadcastSessionEvents = broadcastSessionEvents;
this.broadcastSessionUpdates = broadcastSessionUpdates;
if (updateMode == UpdateMode.AFTER_REQUEST) { if (updateMode == UpdateMode.AFTER_REQUEST) {
removedAttributes = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); removedAttributes = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
...@@ -183,7 +185,7 @@ public class RedissonSession extends StandardSession { ...@@ -183,7 +185,7 @@ public class RedissonSession extends StandardSession {
} else { } else {
map.delete(); map.delete();
} }
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(new AttributesClearMessage(redissonManager.getNodeId(), getId())); topic.publish(new AttributesClearMessage(redissonManager.getNodeId(), getId()));
} }
map = null; map = null;
...@@ -201,7 +203,7 @@ public class RedissonSession extends StandardSession { ...@@ -201,7 +203,7 @@ public class RedissonSession extends StandardSession {
newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime); newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime);
newMap.put(THIS_ACCESSED_TIME_ATTR, thisAccessedTime); newMap.put(THIS_ACCESSED_TIME_ATTR, thisAccessedTime);
map.putAll(newMap); map.putAll(newMap);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(createPutAllMessage(newMap)); topic.publish(createPutAllMessage(newMap));
} }
} }
...@@ -253,7 +255,7 @@ public class RedissonSession extends StandardSession { ...@@ -253,7 +255,7 @@ public class RedissonSession extends StandardSession {
return; return;
} }
map.fastPut(name, value); map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
try { try {
topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value, this.map.getCodec().getMapValueEncoder())); topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value, this.map.getCodec().getMapValueEncoder()));
} catch (IOException e) { } catch (IOException e) {
...@@ -319,7 +321,7 @@ public class RedissonSession extends StandardSession { ...@@ -319,7 +321,7 @@ public class RedissonSession extends StandardSession {
newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime); newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime);
newMap.put(THIS_ACCESSED_TIME_ATTR, thisAccessedTime); newMap.put(THIS_ACCESSED_TIME_ATTR, thisAccessedTime);
map.putAll(newMap); map.putAll(newMap);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(createPutAllMessage(newMap)); topic.publish(createPutAllMessage(newMap));
} }
expireSession(); expireSession();
...@@ -376,7 +378,7 @@ public class RedissonSession extends StandardSession { ...@@ -376,7 +378,7 @@ public class RedissonSession extends StandardSession {
private void removeRedisAttribute(String name) { private void removeRedisAttribute(String name) {
if (updateMode == UpdateMode.DEFAULT && map != null) { if (updateMode == UpdateMode.DEFAULT && map != null) {
map.fastRemove(name); map.fastRemove(name);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(new AttributeRemoveMessage(redissonManager.getNodeId(), getId(), new HashSet<String>(Arrays.asList(name)))); topic.publish(new AttributeRemoveMessage(redissonManager.getNodeId(), getId(), new HashSet<String>(Arrays.asList(name))));
} }
} }
...@@ -425,7 +427,7 @@ public class RedissonSession extends StandardSession { ...@@ -425,7 +427,7 @@ public class RedissonSession extends StandardSession {
map.putAll(newMap); map.putAll(newMap);
map.fastRemove(removedAttributes.toArray(new String[0])); map.fastRemove(removedAttributes.toArray(new String[0]));
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(createPutAllMessage(newMap)); topic.publish(createPutAllMessage(newMap));
if (updateMode == UpdateMode.AFTER_REQUEST) { if (updateMode == UpdateMode.AFTER_REQUEST) {
......
...@@ -48,14 +48,15 @@ public class RedissonSessionManager extends ManagerBase { ...@@ -48,14 +48,15 @@ public class RedissonSessionManager extends ManagerBase {
private final Log log = LogFactory.getLog(RedissonSessionManager.class); private final Log log = LogFactory.getLog(RedissonSessionManager.class);
private RedissonClient redisson; protected RedissonClient redisson;
private String configPath; private String configPath;
private ReadMode readMode = ReadMode.REDIS; private ReadMode readMode = ReadMode.REDIS;
private UpdateMode updateMode = UpdateMode.DEFAULT; private UpdateMode updateMode = UpdateMode.DEFAULT;
private String keyPrefix = ""; protected String keyPrefix = "";
private boolean broadcastSessionEvents = false; private boolean broadcastSessionEvents = false;
private boolean broadcastSessionUpdates = true;
private final String nodeId = UUID.randomUUID().toString(); private final String nodeId = UUID.randomUUID().toString();
...@@ -80,7 +81,15 @@ public class RedissonSessionManager extends ManagerBase { ...@@ -80,7 +81,15 @@ public class RedissonSessionManager extends ManagerBase {
public void setBroadcastSessionEvents(boolean replicateSessionEvents) { public void setBroadcastSessionEvents(boolean replicateSessionEvents) {
this.broadcastSessionEvents = replicateSessionEvents; this.broadcastSessionEvents = replicateSessionEvents;
} }
public boolean isBroadcastSessionUpdates() {
return broadcastSessionUpdates;
}
public void setBroadcastSessionUpdates(boolean broadcastSessionUpdates) {
this.broadcastSessionUpdates = broadcastSessionUpdates;
}
public String getReadMode() { public String getReadMode() {
return readMode.toString(); return readMode.toString();
} }
...@@ -194,7 +203,7 @@ public class RedissonSessionManager extends ManagerBase { ...@@ -194,7 +203,7 @@ public class RedissonSessionManager extends ManagerBase {
@Override @Override
public Session createEmptySession() { public Session createEmptySession() {
return new RedissonSession(this, readMode, updateMode, broadcastSessionEvents); return new RedissonSession(this, readMode, updateMode, broadcastSessionEvents, this.broadcastSessionUpdates);
} }
@Override @Override
...@@ -259,7 +268,7 @@ public class RedissonSessionManager extends ManagerBase { ...@@ -259,7 +268,7 @@ public class RedissonSessionManager extends ManagerBase {
} }
} }
if (readMode == ReadMode.MEMORY || broadcastSessionEvents) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates || broadcastSessionEvents) {
RTopic updatesTopic = getTopic(); RTopic updatesTopic = getTopic();
messageListener = new MessageListener<AttributeMessage>() { messageListener = new MessageListener<AttributeMessage>() {
......
...@@ -71,14 +71,16 @@ public class RedissonSession extends StandardSession { ...@@ -71,14 +71,16 @@ public class RedissonSession extends StandardSession {
private Set<String> removedAttributes = Collections.emptySet(); private Set<String> removedAttributes = Collections.emptySet();
private final boolean broadcastSessionEvents; private final boolean broadcastSessionEvents;
private final boolean broadcastSessionUpdates;
public RedissonSession(RedissonSessionManager manager, ReadMode readMode, UpdateMode updateMode, boolean broadcastSessionEvents) { public RedissonSession(RedissonSessionManager manager, ReadMode readMode, UpdateMode updateMode, boolean broadcastSessionEvents, boolean broadcastSessionUpdates) {
super(manager); super(manager);
this.redissonManager = manager; this.redissonManager = manager;
this.readMode = readMode; this.readMode = readMode;
this.updateMode = updateMode; this.updateMode = updateMode;
this.topic = redissonManager.getTopic(); this.topic = redissonManager.getTopic();
this.broadcastSessionEvents = broadcastSessionEvents; this.broadcastSessionEvents = broadcastSessionEvents;
this.broadcastSessionUpdates = broadcastSessionUpdates;
if (updateMode == UpdateMode.AFTER_REQUEST) { if (updateMode == UpdateMode.AFTER_REQUEST) {
removedAttributes = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); removedAttributes = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
...@@ -183,7 +185,7 @@ public class RedissonSession extends StandardSession { ...@@ -183,7 +185,7 @@ public class RedissonSession extends StandardSession {
} else { } else {
map.delete(); map.delete();
} }
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(new AttributesClearMessage(redissonManager.getNodeId(), getId())); topic.publish(new AttributesClearMessage(redissonManager.getNodeId(), getId()));
} }
map = null; map = null;
...@@ -201,7 +203,7 @@ public class RedissonSession extends StandardSession { ...@@ -201,7 +203,7 @@ public class RedissonSession extends StandardSession {
newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime); newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime);
newMap.put(THIS_ACCESSED_TIME_ATTR, thisAccessedTime); newMap.put(THIS_ACCESSED_TIME_ATTR, thisAccessedTime);
map.putAll(newMap); map.putAll(newMap);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(createPutAllMessage(newMap)); topic.publish(createPutAllMessage(newMap));
} }
} }
...@@ -253,7 +255,7 @@ public class RedissonSession extends StandardSession { ...@@ -253,7 +255,7 @@ public class RedissonSession extends StandardSession {
return; return;
} }
map.fastPut(name, value); map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
try { try {
topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value, this.map.getCodec().getMapValueEncoder())); topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value, this.map.getCodec().getMapValueEncoder()));
} catch (IOException e) { } catch (IOException e) {
...@@ -319,7 +321,7 @@ public class RedissonSession extends StandardSession { ...@@ -319,7 +321,7 @@ public class RedissonSession extends StandardSession {
newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime); newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime);
newMap.put(THIS_ACCESSED_TIME_ATTR, thisAccessedTime); newMap.put(THIS_ACCESSED_TIME_ATTR, thisAccessedTime);
map.putAll(newMap); map.putAll(newMap);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(createPutAllMessage(newMap)); topic.publish(createPutAllMessage(newMap));
} }
expireSession(); expireSession();
...@@ -399,7 +401,7 @@ public class RedissonSession extends StandardSession { ...@@ -399,7 +401,7 @@ public class RedissonSession extends StandardSession {
private void removeRedisAttribute(String name) { private void removeRedisAttribute(String name) {
if (updateMode == UpdateMode.DEFAULT && map != null) { if (updateMode == UpdateMode.DEFAULT && map != null) {
map.fastRemove(name); map.fastRemove(name);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(new AttributeRemoveMessage(redissonManager.getNodeId(), getId(), new HashSet<String>(Arrays.asList(name)))); topic.publish(new AttributeRemoveMessage(redissonManager.getNodeId(), getId(), new HashSet<String>(Arrays.asList(name))));
} }
} }
...@@ -448,7 +450,7 @@ public class RedissonSession extends StandardSession { ...@@ -448,7 +450,7 @@ public class RedissonSession extends StandardSession {
map.putAll(newMap); map.putAll(newMap);
map.fastRemove(removedAttributes.toArray(new String[0])); map.fastRemove(removedAttributes.toArray(new String[0]));
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(createPutAllMessage(newMap)); topic.publish(createPutAllMessage(newMap));
if (updateMode == UpdateMode.AFTER_REQUEST) { if (updateMode == UpdateMode.AFTER_REQUEST) {
......
...@@ -48,14 +48,15 @@ public class RedissonSessionManager extends ManagerBase { ...@@ -48,14 +48,15 @@ public class RedissonSessionManager extends ManagerBase {
private final Log log = LogFactory.getLog(RedissonSessionManager.class); private final Log log = LogFactory.getLog(RedissonSessionManager.class);
private RedissonClient redisson; protected RedissonClient redisson;
private String configPath; private String configPath;
private ReadMode readMode = ReadMode.REDIS; private ReadMode readMode = ReadMode.REDIS;
private UpdateMode updateMode = UpdateMode.DEFAULT; private UpdateMode updateMode = UpdateMode.DEFAULT;
private String keyPrefix = ""; protected String keyPrefix = "";
private boolean broadcastSessionEvents = false; private boolean broadcastSessionEvents = false;
private boolean broadcastSessionUpdates = true;
private final String nodeId = UUID.randomUUID().toString(); private final String nodeId = UUID.randomUUID().toString();
...@@ -80,7 +81,15 @@ public class RedissonSessionManager extends ManagerBase { ...@@ -80,7 +81,15 @@ public class RedissonSessionManager extends ManagerBase {
public void setBroadcastSessionEvents(boolean replicateSessionEvents) { public void setBroadcastSessionEvents(boolean replicateSessionEvents) {
this.broadcastSessionEvents = replicateSessionEvents; this.broadcastSessionEvents = replicateSessionEvents;
} }
public boolean isBroadcastSessionUpdates() {
return broadcastSessionUpdates;
}
public void setBroadcastSessionUpdates(boolean broadcastSessionUpdates) {
this.broadcastSessionUpdates = broadcastSessionUpdates;
}
public String getReadMode() { public String getReadMode() {
return readMode.toString(); return readMode.toString();
} }
...@@ -194,7 +203,7 @@ public class RedissonSessionManager extends ManagerBase { ...@@ -194,7 +203,7 @@ public class RedissonSessionManager extends ManagerBase {
@Override @Override
public Session createEmptySession() { public Session createEmptySession() {
return new RedissonSession(this, readMode, updateMode, broadcastSessionEvents); return new RedissonSession(this, readMode, updateMode, broadcastSessionEvents, this.broadcastSessionUpdates);
} }
@Override @Override
...@@ -259,7 +268,7 @@ public class RedissonSessionManager extends ManagerBase { ...@@ -259,7 +268,7 @@ public class RedissonSessionManager extends ManagerBase {
} }
} }
if (readMode == ReadMode.MEMORY || broadcastSessionEvents) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates || broadcastSessionEvents) {
RTopic updatesTopic = getTopic(); RTopic updatesTopic = getTopic();
messageListener = new MessageListener<AttributeMessage>() { messageListener = new MessageListener<AttributeMessage>() {
......
...@@ -71,14 +71,16 @@ public class RedissonSession extends StandardSession { ...@@ -71,14 +71,16 @@ public class RedissonSession extends StandardSession {
private Set<String> removedAttributes = Collections.emptySet(); private Set<String> removedAttributes = Collections.emptySet();
private final boolean broadcastSessionEvents; private final boolean broadcastSessionEvents;
private final boolean broadcastSessionUpdates;
public RedissonSession(RedissonSessionManager manager, ReadMode readMode, UpdateMode updateMode, boolean broadcastSessionEvents) { public RedissonSession(RedissonSessionManager manager, ReadMode readMode, UpdateMode updateMode, boolean broadcastSessionEvents, boolean broadcastSessionUpdates) {
super(manager); super(manager);
this.redissonManager = manager; this.redissonManager = manager;
this.readMode = readMode; this.readMode = readMode;
this.updateMode = updateMode; this.updateMode = updateMode;
this.topic = redissonManager.getTopic(); this.topic = redissonManager.getTopic();
this.broadcastSessionEvents = broadcastSessionEvents; this.broadcastSessionEvents = broadcastSessionEvents;
this.broadcastSessionUpdates = broadcastSessionUpdates;
if (updateMode == UpdateMode.AFTER_REQUEST) { if (updateMode == UpdateMode.AFTER_REQUEST) {
removedAttributes = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); removedAttributes = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
...@@ -183,7 +185,7 @@ public class RedissonSession extends StandardSession { ...@@ -183,7 +185,7 @@ public class RedissonSession extends StandardSession {
} else { } else {
map.delete(); map.delete();
} }
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(new AttributesClearMessage(redissonManager.getNodeId(), getId())); topic.publish(new AttributesClearMessage(redissonManager.getNodeId(), getId()));
} }
map = null; map = null;
...@@ -201,7 +203,7 @@ public class RedissonSession extends StandardSession { ...@@ -201,7 +203,7 @@ public class RedissonSession extends StandardSession {
newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime); newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime);
newMap.put(THIS_ACCESSED_TIME_ATTR, thisAccessedTime); newMap.put(THIS_ACCESSED_TIME_ATTR, thisAccessedTime);
map.putAll(newMap); map.putAll(newMap);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(createPutAllMessage(newMap)); topic.publish(createPutAllMessage(newMap));
} }
} }
...@@ -253,7 +255,7 @@ public class RedissonSession extends StandardSession { ...@@ -253,7 +255,7 @@ public class RedissonSession extends StandardSession {
return; return;
} }
map.fastPut(name, value); map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
try { try {
topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value, this.map.getCodec().getMapValueEncoder())); topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value, this.map.getCodec().getMapValueEncoder()));
} catch (IOException e) { } catch (IOException e) {
...@@ -319,7 +321,7 @@ public class RedissonSession extends StandardSession { ...@@ -319,7 +321,7 @@ public class RedissonSession extends StandardSession {
newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime); newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime);
newMap.put(THIS_ACCESSED_TIME_ATTR, thisAccessedTime); newMap.put(THIS_ACCESSED_TIME_ATTR, thisAccessedTime);
map.putAll(newMap); map.putAll(newMap);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(createPutAllMessage(newMap)); topic.publish(createPutAllMessage(newMap));
} }
expireSession(); expireSession();
...@@ -376,7 +378,7 @@ public class RedissonSession extends StandardSession { ...@@ -376,7 +378,7 @@ public class RedissonSession extends StandardSession {
private void removeRedisAttribute(String name) { private void removeRedisAttribute(String name) {
if (updateMode == UpdateMode.DEFAULT && map != null) { if (updateMode == UpdateMode.DEFAULT && map != null) {
map.fastRemove(name); map.fastRemove(name);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(new AttributeRemoveMessage(redissonManager.getNodeId(), getId(), new HashSet<String>(Arrays.asList(name)))); topic.publish(new AttributeRemoveMessage(redissonManager.getNodeId(), getId(), new HashSet<String>(Arrays.asList(name))));
} }
} }
...@@ -425,7 +427,7 @@ public class RedissonSession extends StandardSession { ...@@ -425,7 +427,7 @@ public class RedissonSession extends StandardSession {
map.putAll(newMap); map.putAll(newMap);
map.fastRemove(removedAttributes.toArray(new String[0])); map.fastRemove(removedAttributes.toArray(new String[0]));
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(createPutAllMessage(newMap)); topic.publish(createPutAllMessage(newMap));
if (updateMode == UpdateMode.AFTER_REQUEST) { if (updateMode == UpdateMode.AFTER_REQUEST) {
......
...@@ -48,14 +48,15 @@ public class RedissonSessionManager extends ManagerBase { ...@@ -48,14 +48,15 @@ public class RedissonSessionManager extends ManagerBase {
private final Log log = LogFactory.getLog(RedissonSessionManager.class); private final Log log = LogFactory.getLog(RedissonSessionManager.class);
private RedissonClient redisson; protected RedissonClient redisson;
private String configPath; private String configPath;
private ReadMode readMode = ReadMode.REDIS; private ReadMode readMode = ReadMode.REDIS;
private UpdateMode updateMode = UpdateMode.DEFAULT; private UpdateMode updateMode = UpdateMode.DEFAULT;
private String keyPrefix = ""; protected String keyPrefix = "";
private boolean broadcastSessionEvents = false; private boolean broadcastSessionEvents = false;
private boolean broadcastSessionUpdates = true;
private final String nodeId = UUID.randomUUID().toString(); private final String nodeId = UUID.randomUUID().toString();
...@@ -80,7 +81,15 @@ public class RedissonSessionManager extends ManagerBase { ...@@ -80,7 +81,15 @@ public class RedissonSessionManager extends ManagerBase {
public void setBroadcastSessionEvents(boolean replicateSessionEvents) { public void setBroadcastSessionEvents(boolean replicateSessionEvents) {
this.broadcastSessionEvents = replicateSessionEvents; this.broadcastSessionEvents = replicateSessionEvents;
} }
public boolean isBroadcastSessionUpdates() {
return broadcastSessionUpdates;
}
public void setBroadcastSessionUpdates(boolean broadcastSessionUpdates) {
this.broadcastSessionUpdates = broadcastSessionUpdates;
}
public String getReadMode() { public String getReadMode() {
return readMode.toString(); return readMode.toString();
} }
...@@ -194,7 +203,7 @@ public class RedissonSessionManager extends ManagerBase { ...@@ -194,7 +203,7 @@ public class RedissonSessionManager extends ManagerBase {
@Override @Override
public Session createEmptySession() { public Session createEmptySession() {
return new RedissonSession(this, readMode, updateMode, broadcastSessionEvents); return new RedissonSession(this, readMode, updateMode, broadcastSessionEvents, this.broadcastSessionUpdates);
} }
@Override @Override
...@@ -259,7 +268,7 @@ public class RedissonSessionManager extends ManagerBase { ...@@ -259,7 +268,7 @@ public class RedissonSessionManager extends ManagerBase {
} }
} }
if (readMode == ReadMode.MEMORY || broadcastSessionEvents) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates || broadcastSessionEvents) {
RTopic updatesTopic = getTopic(); RTopic updatesTopic = getTopic();
messageListener = new MessageListener<AttributeMessage>() { messageListener = new MessageListener<AttributeMessage>() {
...@@ -354,7 +363,7 @@ public class RedissonSessionManager extends ManagerBase { ...@@ -354,7 +363,7 @@ public class RedissonSessionManager extends ManagerBase {
super.stopInternal(); super.stopInternal();
setState(LifecycleState.STOPPING); setState(LifecycleState.STOPPING);
Pipeline pipeline = getContext().getPipeline(); Pipeline pipeline = getContext().getPipeline();
synchronized (pipeline) { synchronized (pipeline) {
if (readMode == ReadMode.REDIS) { if (readMode == ReadMode.REDIS) {
......
...@@ -71,14 +71,16 @@ public class RedissonSession extends StandardSession { ...@@ -71,14 +71,16 @@ public class RedissonSession extends StandardSession {
private Set<String> removedAttributes = Collections.emptySet(); private Set<String> removedAttributes = Collections.emptySet();
private final boolean broadcastSessionEvents; private final boolean broadcastSessionEvents;
private final boolean broadcastSessionUpdates;
public RedissonSession(RedissonSessionManager manager, ReadMode readMode, UpdateMode updateMode, boolean broadcastSessionEvents) { public RedissonSession(RedissonSessionManager manager, ReadMode readMode, UpdateMode updateMode, boolean broadcastSessionEvents, boolean broadcastSessionUpdates) {
super(manager); super(manager);
this.redissonManager = manager; this.redissonManager = manager;
this.readMode = readMode; this.readMode = readMode;
this.updateMode = updateMode; this.updateMode = updateMode;
this.topic = redissonManager.getTopic(); this.topic = redissonManager.getTopic();
this.broadcastSessionEvents = broadcastSessionEvents; this.broadcastSessionEvents = broadcastSessionEvents;
this.broadcastSessionUpdates = broadcastSessionUpdates;
if (updateMode == UpdateMode.AFTER_REQUEST) { if (updateMode == UpdateMode.AFTER_REQUEST) {
removedAttributes = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); removedAttributes = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
...@@ -183,7 +185,7 @@ public class RedissonSession extends StandardSession { ...@@ -183,7 +185,7 @@ public class RedissonSession extends StandardSession {
} else { } else {
map.delete(); map.delete();
} }
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(new AttributesClearMessage(redissonManager.getNodeId(), getId())); topic.publish(new AttributesClearMessage(redissonManager.getNodeId(), getId()));
} }
map = null; map = null;
...@@ -201,7 +203,7 @@ public class RedissonSession extends StandardSession { ...@@ -201,7 +203,7 @@ public class RedissonSession extends StandardSession {
newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime); newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime);
newMap.put(THIS_ACCESSED_TIME_ATTR, thisAccessedTime); newMap.put(THIS_ACCESSED_TIME_ATTR, thisAccessedTime);
map.putAll(newMap); map.putAll(newMap);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(createPutAllMessage(newMap)); topic.publish(createPutAllMessage(newMap));
} }
} }
...@@ -253,7 +255,7 @@ public class RedissonSession extends StandardSession { ...@@ -253,7 +255,7 @@ public class RedissonSession extends StandardSession {
return; return;
} }
map.fastPut(name, value); map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
try { try {
topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value, this.map.getCodec().getMapValueEncoder())); topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value, this.map.getCodec().getMapValueEncoder()));
} catch (IOException e) { } catch (IOException e) {
...@@ -319,7 +321,7 @@ public class RedissonSession extends StandardSession { ...@@ -319,7 +321,7 @@ public class RedissonSession extends StandardSession {
newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime); newMap.put(LAST_ACCESSED_TIME_ATTR, lastAccessedTime);
newMap.put(THIS_ACCESSED_TIME_ATTR, thisAccessedTime); newMap.put(THIS_ACCESSED_TIME_ATTR, thisAccessedTime);
map.putAll(newMap); map.putAll(newMap);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(createPutAllMessage(newMap)); topic.publish(createPutAllMessage(newMap));
} }
expireSession(); expireSession();
...@@ -376,7 +378,7 @@ public class RedissonSession extends StandardSession { ...@@ -376,7 +378,7 @@ public class RedissonSession extends StandardSession {
private void removeRedisAttribute(String name) { private void removeRedisAttribute(String name) {
if (updateMode == UpdateMode.DEFAULT && map != null) { if (updateMode == UpdateMode.DEFAULT && map != null) {
map.fastRemove(name); map.fastRemove(name);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(new AttributeRemoveMessage(redissonManager.getNodeId(), getId(), new HashSet<String>(Arrays.asList(name)))); topic.publish(new AttributeRemoveMessage(redissonManager.getNodeId(), getId(), new HashSet<String>(Arrays.asList(name))));
} }
} }
...@@ -425,7 +427,7 @@ public class RedissonSession extends StandardSession { ...@@ -425,7 +427,7 @@ public class RedissonSession extends StandardSession {
map.putAll(newMap); map.putAll(newMap);
map.fastRemove(removedAttributes.toArray(new String[0])); map.fastRemove(removedAttributes.toArray(new String[0]));
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates) {
topic.publish(createPutAllMessage(newMap)); topic.publish(createPutAllMessage(newMap));
if (updateMode == UpdateMode.AFTER_REQUEST) { if (updateMode == UpdateMode.AFTER_REQUEST) {
......
...@@ -48,14 +48,15 @@ public class RedissonSessionManager extends ManagerBase { ...@@ -48,14 +48,15 @@ public class RedissonSessionManager extends ManagerBase {
private final Log log = LogFactory.getLog(RedissonSessionManager.class); private final Log log = LogFactory.getLog(RedissonSessionManager.class);
private RedissonClient redisson; protected RedissonClient redisson;
private String configPath; private String configPath;
private ReadMode readMode = ReadMode.REDIS; private ReadMode readMode = ReadMode.REDIS;
private UpdateMode updateMode = UpdateMode.DEFAULT; private UpdateMode updateMode = UpdateMode.DEFAULT;
private String keyPrefix = ""; protected String keyPrefix = "";
private boolean broadcastSessionEvents = false; private boolean broadcastSessionEvents = false;
private boolean broadcastSessionUpdates = true;
private final String nodeId = UUID.randomUUID().toString(); private final String nodeId = UUID.randomUUID().toString();
...@@ -80,7 +81,15 @@ public class RedissonSessionManager extends ManagerBase { ...@@ -80,7 +81,15 @@ public class RedissonSessionManager extends ManagerBase {
public void setBroadcastSessionEvents(boolean replicateSessionEvents) { public void setBroadcastSessionEvents(boolean replicateSessionEvents) {
this.broadcastSessionEvents = replicateSessionEvents; this.broadcastSessionEvents = replicateSessionEvents;
} }
public boolean isBroadcastSessionUpdates() {
return broadcastSessionUpdates;
}
public void setBroadcastSessionUpdates(boolean broadcastSessionUpdates) {
this.broadcastSessionUpdates = broadcastSessionUpdates;
}
public String getReadMode() { public String getReadMode() {
return readMode.toString(); return readMode.toString();
} }
...@@ -194,7 +203,7 @@ public class RedissonSessionManager extends ManagerBase { ...@@ -194,7 +203,7 @@ public class RedissonSessionManager extends ManagerBase {
@Override @Override
public Session createEmptySession() { public Session createEmptySession() {
return new RedissonSession(this, readMode, updateMode, broadcastSessionEvents); return new RedissonSession(this, readMode, updateMode, broadcastSessionEvents, this.broadcastSessionUpdates);
} }
@Override @Override
...@@ -259,7 +268,7 @@ public class RedissonSessionManager extends ManagerBase { ...@@ -259,7 +268,7 @@ public class RedissonSessionManager extends ManagerBase {
} }
} }
if (readMode == ReadMode.MEMORY || broadcastSessionEvents) { if (readMode == ReadMode.MEMORY && this.broadcastSessionUpdates || broadcastSessionEvents) {
RTopic updatesTopic = getTopic(); RTopic updatesTopic = getTopic();
messageListener = new MessageListener<AttributeMessage>() { messageListener = new MessageListener<AttributeMessage>() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册