提交 a4b4c38a 编写于 作者: N Nikita

replication for Tomcat sessions located in memory. #1414

上级 4e27eed2
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeMessage {
private String sessionId;
public AttributeMessage() {
}
public AttributeMessage(String sessionId) {
this.sessionId = sessionId;
}
public String getSessionId() {
return sessionId;
}
}
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeRemoveMessage extends AttributeMessage {
private String name;
public AttributeRemoveMessage() {
super();
}
public AttributeRemoveMessage(String sessionId, String name) {
super(sessionId);
this.name = name;
}
public String getName() {
return name;
}
}
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeUpdateMessage extends AttributeMessage {
private String name;
private Object value;
public AttributeUpdateMessage() {
}
public AttributeUpdateMessage(String sessionId, String name, Object value) {
super(sessionId);
this.name = name;
this.value = value;
}
public String getName() {
return name;
}
public Object getValue() {
return value;
}
}
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesClearMessage extends AttributeMessage {
public AttributesClearMessage() {
}
public AttributesClearMessage(String sessionId) {
super(sessionId);
}
}
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
import java.util.Map;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesPutAllMessage extends AttributeMessage {
private Map<String, Object> attrs;
public AttributesPutAllMessage() {
}
public AttributesPutAllMessage(String sessionId, Map<String, Object> attrs) {
super(sessionId);
this.attrs = attrs;
}
public Map<String, Object> getAttrs() {
return attrs;
}
}
......@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.catalina.session.StandardSession;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.tomcat.RedissonSessionManager.ReadMode;
import org.redisson.tomcat.RedissonSessionManager.UpdateMode;
......@@ -37,6 +38,7 @@ public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs;
private RMap<String, Object> map;
private RTopic<AttributeMessage> topic;
private final RedissonSessionManager.ReadMode readMode;
private final UpdateMode updateMode;
......@@ -69,6 +71,15 @@ public class RedissonSession extends StandardSession {
public void setId(String id, boolean notify) {
super.setId(id, notify);
map = redissonManager.getMap(id);
topic = redissonManager.getTopic();
}
public void delete() {
map.delete();
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesClearMessage(getId()));
}
map = null;
}
@Override
......@@ -81,6 +92,9 @@ public class RedissonSession extends StandardSession {
newMap.put("session:lastAccessedTime", lastAccessedTime);
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
}
}
......@@ -93,6 +107,9 @@ public class RedissonSession extends StandardSession {
newMap.put("session:lastAccessedTime", lastAccessedTime);
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
if (getMaxInactiveInterval() >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
......@@ -104,13 +121,20 @@ public class RedissonSession extends StandardSession {
super.setMaxInactiveInterval(interval);
if (map != null) {
map.fastPut("session:maxInactiveInterval", maxInactiveInterval);
fastPut("session:maxInactiveInterval", maxInactiveInterval);
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
}
}
private void fastPut(String name, Object value) {
map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributeUpdateMessage(getId(), name, value));
}
}
@Override
public void setValid(boolean isValid) {
super.setValid(isValid);
......@@ -120,7 +144,7 @@ public class RedissonSession extends StandardSession {
return;
}
map.fastPut("session:isValid", isValid);
fastPut("session:isValid", isValid);
}
}
......@@ -129,7 +153,7 @@ public class RedissonSession extends StandardSession {
super.setNew(isNew);
if (map != null) {
map.fastPut("session:isNew", isNew);
fastPut("session:isNew", isNew);
}
}
......@@ -139,25 +163,36 @@ public class RedissonSession extends StandardSession {
super.endAccess();
if (isNew != oldValue) {
map.fastPut("session:isNew", isNew);
fastPut("session:isNew", isNew);
}
}
public void superSetAttribute(String name, Object value, boolean notify) {
super.setAttribute(name, value, notify);
}
@Override
public void setAttribute(String name, Object value, boolean notify) {
super.setAttribute(name, value, notify);
if (updateMode == UpdateMode.DEFAULT && map != null && value != null) {
map.fastPut(name, value);
fastPut(name, value);
}
}
public void superRemoveAttributeInternal(String name, boolean notify) {
super.removeAttributeInternal(name, notify);
}
@Override
protected void removeAttributeInternal(String name, boolean notify) {
super.removeAttributeInternal(name, notify);
if (updateMode == UpdateMode.DEFAULT && map != null) {
map.fastRemove(name);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributeRemoveMessage(getId(), name));
}
}
}
......@@ -177,6 +212,9 @@ public class RedissonSession extends StandardSession {
}
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
......@@ -210,7 +248,7 @@ public class RedissonSession extends StandardSession {
}
for (Entry<String, Object> entry : attrs.entrySet()) {
setAttribute(entry.getKey(), entry.getValue(), false);
super.setAttribute(entry.getKey(), entry.getValue(), false);
}
}
......
......@@ -18,6 +18,7 @@ package org.redisson.tomcat;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import javax.servlet.http.HttpSession;
......@@ -32,7 +33,10 @@ import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.redisson.Redisson;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
/**
......@@ -141,7 +145,12 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
public RMap<String, Object> getMap(String sessionId) {
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId);
final String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name);
}
public RTopic<AttributeMessage> getTopic() {
return redisson.getTopic("redisson:tomcat_session_updates");
}
@Override
......@@ -149,6 +158,7 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
Session result = super.findSession(id);
if (result == null && id != null) {
Map<String, Object> attrs = getMap(id).readAllMap();
if (attrs.isEmpty() || !Boolean.valueOf(String.valueOf(attrs.get("session:isValid")))) {
log.info("Session " + id + " can't be found");
return null;
......@@ -178,7 +188,9 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
public void remove(Session session) {
super.remove(session);
getMap(session.getId()).delete();
if (session.getIdInternal() != null) {
((RedissonSession)session).delete();
}
}
public RedissonClient getRedisson() {
......@@ -193,6 +205,43 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
getEngine().getPipeline().addValve(new UpdateValve(this));
}
if (readMode == ReadMode.MEMORY) {
RTopic<AttributeMessage> updatesTopic = getTopic();
updatesTopic.addListener(new MessageListener<AttributeMessage>() {
@Override
public void onMessage(String channel, AttributeMessage msg) {
try {
// TODO make it thread-safe
RedissonSession session = (RedissonSession) RedissonSessionManager.super.findSession(msg.getSessionId());
if (session != null) {
if (msg instanceof AttributeRemoveMessage) {
session.superRemoveAttributeInternal(((AttributeRemoveMessage)msg).getName(), true);
}
if (msg instanceof AttributesClearMessage) {
RedissonSessionManager.super.remove(session);
}
if (msg instanceof AttributesPutAllMessage) {
AttributesPutAllMessage m = (AttributesPutAllMessage) msg;
for (Entry<String, Object> entry : m.getAttrs().entrySet()) {
session.superSetAttribute(entry.getKey(), entry.getValue(), true);
}
}
if (msg instanceof AttributeUpdateMessage) {
AttributeUpdateMessage m = (AttributeUpdateMessage)msg;
session.superSetAttribute(m.getName(), m.getValue(), true);
}
}
} catch (IOException e) {
log.error("Can't handle topic message", e);
}
}
});
}
lifecycle.fireLifecycleEvent(START_EVENT, null);
}
......
......@@ -18,6 +18,30 @@ import org.redisson.config.Config;
public class RedissonSessionManagerTest {
@Test
public void testUpdateTwoServers() throws Exception {
TomcatServer server1 = new TomcatServer("myapp", 8080, "src/test/");
server1.start();
Executor executor = Executor.newInstance();
BasicCookieStore cookieStore = new BasicCookieStore();
executor.use(cookieStore);
write(executor, "test", "1234");
TomcatServer server2 = new TomcatServer("myapp", 8081, "src/test/");
server2.start();
read(8081, executor, "test", "1234");
read(executor, "test", "1234");
write(executor, "test", "324");
read(8081, executor, "test", "324");
Executor.closeIdleConnections();
server1.stop();
server2.stop();
}
@Test
public void testExpiration() throws Exception {
TomcatServer server1 = new TomcatServer("myapp", 8080, "/src/test/");
......
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeMessage {
private String sessionId;
public AttributeMessage() {
}
public AttributeMessage(String sessionId) {
this.sessionId = sessionId;
}
public String getSessionId() {
return sessionId;
}
}
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeRemoveMessage extends AttributeMessage {
private String name;
public AttributeRemoveMessage() {
super();
}
public AttributeRemoveMessage(String sessionId, String name) {
super(sessionId);
this.name = name;
}
public String getName() {
return name;
}
}
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeUpdateMessage extends AttributeMessage {
private String name;
private Object value;
public AttributeUpdateMessage() {
}
public AttributeUpdateMessage(String sessionId, String name, Object value) {
super(sessionId);
this.name = name;
this.value = value;
}
public String getName() {
return name;
}
public Object getValue() {
return value;
}
}
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesClearMessage extends AttributeMessage {
public AttributesClearMessage() {
}
public AttributesClearMessage(String sessionId) {
super(sessionId);
}
}
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
import java.util.Map;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesPutAllMessage extends AttributeMessage {
private Map<String, Object> attrs;
public AttributesPutAllMessage() {
}
public AttributesPutAllMessage(String sessionId, Map<String, Object> attrs) {
super(sessionId);
this.attrs = attrs;
}
public Map<String, Object> getAttrs() {
return attrs;
}
}
......@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.catalina.session.StandardSession;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.tomcat.RedissonSessionManager.ReadMode;
import org.redisson.tomcat.RedissonSessionManager.UpdateMode;
......@@ -37,6 +38,7 @@ public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs;
private RMap<String, Object> map;
private RTopic<AttributeMessage> topic;
private final RedissonSessionManager.ReadMode readMode;
private final UpdateMode updateMode;
......@@ -69,10 +71,14 @@ public class RedissonSession extends StandardSession {
public void setId(String id, boolean notify) {
super.setId(id, notify);
map = redissonManager.getMap(id);
topic = redissonManager.getTopic();
}
public void delete() {
map.delete();
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesClearMessage(getId()));
}
map = null;
}
......@@ -86,6 +92,9 @@ public class RedissonSession extends StandardSession {
newMap.put("session:lastAccessedTime", lastAccessedTime);
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
}
}
......@@ -98,6 +107,9 @@ public class RedissonSession extends StandardSession {
newMap.put("session:lastAccessedTime", lastAccessedTime);
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
if (getMaxInactiveInterval() >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
......@@ -109,12 +121,19 @@ public class RedissonSession extends StandardSession {
super.setMaxInactiveInterval(interval);
if (map != null) {
map.fastPut("session:maxInactiveInterval", maxInactiveInterval);
fastPut("session:maxInactiveInterval", maxInactiveInterval);
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
}
}
private void fastPut(String name, Object value) {
map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributeUpdateMessage(getId(), name, value));
}
}
@Override
public void setValid(boolean isValid) {
......@@ -124,8 +143,8 @@ public class RedissonSession extends StandardSession {
if (!isValid && !map.isExists()) {
return;
}
map.fastPut("session:isValid", isValid);
fastPut("session:isValid", isValid);
}
}
......@@ -134,7 +153,7 @@ public class RedissonSession extends StandardSession {
super.setNew(isNew);
if (map != null) {
map.fastPut("session:isNew", isNew);
fastPut("session:isNew", isNew);
}
}
......@@ -144,25 +163,36 @@ public class RedissonSession extends StandardSession {
super.endAccess();
if (isNew != oldValue) {
map.fastPut("session:isNew", isNew);
fastPut("session:isNew", isNew);
}
}
public void superSetAttribute(String name, Object value, boolean notify) {
super.setAttribute(name, value, notify);
}
@Override
public void setAttribute(String name, Object value, boolean notify) {
super.setAttribute(name, value, notify);
if (updateMode == UpdateMode.DEFAULT && map != null && value != null) {
map.fastPut(name, value);
fastPut(name, value);
}
}
public void superRemoveAttributeInternal(String name, boolean notify) {
super.removeAttributeInternal(name, notify);
}
@Override
protected void removeAttributeInternal(String name, boolean notify) {
super.removeAttributeInternal(name, notify);
if (updateMode == UpdateMode.DEFAULT && map != null) {
map.fastRemove(name);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributeRemoveMessage(getId(), name));
}
}
}
......@@ -182,6 +212,9 @@ public class RedissonSession extends StandardSession {
}
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
......@@ -215,7 +248,7 @@ public class RedissonSession extends StandardSession {
}
for (Entry<String, Object> entry : attrs.entrySet()) {
setAttribute(entry.getKey(), entry.getValue(), false);
super.setAttribute(entry.getKey(), entry.getValue(), false);
}
}
......
......@@ -18,6 +18,7 @@ package org.redisson.tomcat;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import javax.servlet.http.HttpSession;
......@@ -30,7 +31,9 @@ import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.redisson.Redisson;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
......@@ -121,7 +124,12 @@ public class RedissonSessionManager extends ManagerBase {
public RMap<String, Object> getMap(String sessionId) {
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId);
final String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name);
}
public RTopic<AttributeMessage> getTopic() {
return redisson.getTopic("redisson:tomcat_session_updates");
}
@Override
......@@ -129,6 +137,7 @@ public class RedissonSessionManager extends ManagerBase {
Session result = super.findSession(id);
if (result == null && id != null) {
Map<String, Object> attrs = getMap(id).readAllMap();
if (attrs.isEmpty() || !Boolean.valueOf(String.valueOf(attrs.get("session:isValid")))) {
log.info("Session " + id + " can't be found");
return null;
......@@ -142,7 +151,7 @@ public class RedissonSessionManager extends ManagerBase {
session.endAccess();
return session;
}
result.access();
result.endAccess();
......@@ -170,13 +179,49 @@ public class RedissonSessionManager extends ManagerBase {
@Override
protected void startInternal() throws LifecycleException {
super.startInternal();
redisson = buildClient();
if (updateMode == UpdateMode.AFTER_REQUEST) {
getEngine().getPipeline().addValve(new UpdateValve(this));
}
if (readMode == ReadMode.MEMORY) {
RTopic<AttributeMessage> updatesTopic = getTopic();
updatesTopic.addListener(new MessageListener<AttributeMessage>() {
@Override
public void onMessage(String channel, AttributeMessage msg) {
try {
// TODO make it thread-safe
RedissonSession session = (RedissonSession) RedissonSessionManager.super.findSession(msg.getSessionId());
if (session != null) {
if (msg instanceof AttributeRemoveMessage) {
session.superRemoveAttributeInternal(((AttributeRemoveMessage)msg).getName(), true);
}
if (msg instanceof AttributesClearMessage) {
RedissonSessionManager.super.remove(session, false);
}
if (msg instanceof AttributesPutAllMessage) {
AttributesPutAllMessage m = (AttributesPutAllMessage) msg;
for (Entry<String, Object> entry : m.getAttrs().entrySet()) {
session.superSetAttribute(entry.getKey(), entry.getValue(), true);
}
}
if (msg instanceof AttributeUpdateMessage) {
AttributeUpdateMessage m = (AttributeUpdateMessage)msg;
session.superSetAttribute(m.getName(), m.getValue(), true);
}
}
} catch (IOException e) {
log.error("Can't handle topic message", e);
}
}
});
}
setState(LifecycleState.STARTING);
}
......
......@@ -17,6 +17,31 @@ import org.redisson.config.Config;
public class RedissonSessionManagerTest {
@Test
public void testUpdateTwoServers() throws Exception {
TomcatServer server1 = new TomcatServer("myapp", 8080, "src/test/");
server1.start();
Executor executor = Executor.newInstance();
BasicCookieStore cookieStore = new BasicCookieStore();
executor.use(cookieStore);
write(executor, "test", "1234");
TomcatServer server2 = new TomcatServer("myapp", 8081, "src/test/");
server2.start();
read(8081, executor, "test", "1234");
read(executor, "test", "1234");
write(executor, "test", "324");
read(8081, executor, "test", "324");
Executor.closeIdleConnections();
server1.stop();
server2.stop();
}
@Test
public void testExpiration() throws Exception {
TomcatServer server1 = new TomcatServer("myapp", 8080, "src/test/");
......@@ -170,7 +195,7 @@ public class RedissonSessionManagerTest {
String response = executor.execute(Request.Get(url)).returnContent().asString();
Assert.assertEquals(value, response);
}
private void read(Executor executor, String key, String value) throws IOException, ClientProtocolException {
String url = "http://localhost:8080/myapp/read?key=" + key;
String response = executor.execute(Request.Get(url)).returnContent().asString();
......
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeMessage {
private String sessionId;
public AttributeMessage() {
}
public AttributeMessage(String sessionId) {
this.sessionId = sessionId;
}
public String getSessionId() {
return sessionId;
}
}
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeRemoveMessage extends AttributeMessage {
private String name;
public AttributeRemoveMessage() {
super();
}
public AttributeRemoveMessage(String sessionId, String name) {
super(sessionId);
this.name = name;
}
public String getName() {
return name;
}
}
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeUpdateMessage extends AttributeMessage {
private String name;
private Object value;
public AttributeUpdateMessage() {
}
public AttributeUpdateMessage(String sessionId, String name, Object value) {
super(sessionId);
this.name = name;
this.value = value;
}
public String getName() {
return name;
}
public Object getValue() {
return value;
}
}
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesClearMessage extends AttributeMessage {
public AttributesClearMessage() {
}
public AttributesClearMessage(String sessionId) {
super(sessionId);
}
}
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
import java.util.Map;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesPutAllMessage extends AttributeMessage {
private Map<String, Object> attrs;
public AttributesPutAllMessage() {
}
public AttributesPutAllMessage(String sessionId, Map<String, Object> attrs) {
super(sessionId);
this.attrs = attrs;
}
public Map<String, Object> getAttrs() {
return attrs;
}
}
......@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.catalina.session.StandardSession;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.tomcat.RedissonSessionManager.ReadMode;
import org.redisson.tomcat.RedissonSessionManager.UpdateMode;
......@@ -37,6 +38,7 @@ public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs;
private RMap<String, Object> map;
private RTopic<AttributeMessage> topic;
private final RedissonSessionManager.ReadMode readMode;
private final UpdateMode updateMode;
......@@ -69,10 +71,14 @@ public class RedissonSession extends StandardSession {
public void setId(String id, boolean notify) {
super.setId(id, notify);
map = redissonManager.getMap(id);
topic = redissonManager.getTopic();
}
public void delete() {
map.delete();
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesClearMessage(getId()));
}
map = null;
}
......@@ -86,6 +92,9 @@ public class RedissonSession extends StandardSession {
newMap.put("session:lastAccessedTime", lastAccessedTime);
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
}
}
......@@ -98,6 +107,9 @@ public class RedissonSession extends StandardSession {
newMap.put("session:lastAccessedTime", lastAccessedTime);
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
if (getMaxInactiveInterval() >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
......@@ -109,12 +121,19 @@ public class RedissonSession extends StandardSession {
super.setMaxInactiveInterval(interval);
if (map != null) {
map.fastPut("session:maxInactiveInterval", maxInactiveInterval);
fastPut("session:maxInactiveInterval", maxInactiveInterval);
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
}
}
private void fastPut(String name, Object value) {
map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributeUpdateMessage(getId(), name, value));
}
}
@Override
public void setValid(boolean isValid) {
......@@ -125,7 +144,7 @@ public class RedissonSession extends StandardSession {
return;
}
map.fastPut("session:isValid", isValid);
fastPut("session:isValid", isValid);
}
}
......@@ -134,7 +153,7 @@ public class RedissonSession extends StandardSession {
super.setNew(isNew);
if (map != null) {
map.fastPut("session:isNew", isNew);
fastPut("session:isNew", isNew);
}
}
......@@ -144,25 +163,36 @@ public class RedissonSession extends StandardSession {
super.endAccess();
if (isNew != oldValue) {
map.fastPut("session:isNew", isNew);
fastPut("session:isNew", isNew);
}
}
public void superSetAttribute(String name, Object value, boolean notify) {
super.setAttribute(name, value, notify);
}
@Override
public void setAttribute(String name, Object value, boolean notify) {
super.setAttribute(name, value, notify);
if (updateMode == UpdateMode.DEFAULT && map != null && value != null) {
map.fastPut(name, value);
fastPut(name, value);
}
}
public void superRemoveAttributeInternal(String name, boolean notify) {
super.removeAttributeInternal(name, notify);
}
@Override
protected void removeAttributeInternal(String name, boolean notify) {
super.removeAttributeInternal(name, notify);
if (updateMode == UpdateMode.DEFAULT && map != null) {
map.fastRemove(name);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributeRemoveMessage(getId(), name));
}
}
}
......@@ -182,6 +212,9 @@ public class RedissonSession extends StandardSession {
}
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
......@@ -215,7 +248,7 @@ public class RedissonSession extends StandardSession {
}
for (Entry<String, Object> entry : attrs.entrySet()) {
setAttribute(entry.getKey(), entry.getValue(), false);
super.setAttribute(entry.getKey(), entry.getValue(), false);
}
}
......
......@@ -18,6 +18,7 @@ package org.redisson.tomcat;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import javax.servlet.http.HttpSession;
......@@ -29,7 +30,9 @@ import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.redisson.Redisson;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
......@@ -120,7 +123,12 @@ public class RedissonSessionManager extends ManagerBase {
public RMap<String, Object> getMap(String sessionId) {
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId);
final String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name);
}
public RTopic<AttributeMessage> getTopic() {
return redisson.getTopic("redisson:tomcat_session_updates");
}
@Override
......@@ -176,6 +184,43 @@ public class RedissonSessionManager extends ManagerBase {
getEngine().getPipeline().addValve(new UpdateValve(this));
}
if (readMode == ReadMode.MEMORY) {
RTopic<AttributeMessage> updatesTopic = getTopic();
updatesTopic.addListener(new MessageListener<AttributeMessage>() {
@Override
public void onMessage(String channel, AttributeMessage msg) {
try {
// TODO make it thread-safe
RedissonSession session = (RedissonSession) RedissonSessionManager.super.findSession(msg.getSessionId());
if (session != null) {
if (msg instanceof AttributeRemoveMessage) {
session.superRemoveAttributeInternal(((AttributeRemoveMessage)msg).getName(), true);
}
if (msg instanceof AttributesClearMessage) {
RedissonSessionManager.super.remove(session, false);
}
if (msg instanceof AttributesPutAllMessage) {
AttributesPutAllMessage m = (AttributesPutAllMessage) msg;
for (Entry<String, Object> entry : m.getAttrs().entrySet()) {
session.superSetAttribute(entry.getKey(), entry.getValue(), true);
}
}
if (msg instanceof AttributeUpdateMessage) {
AttributeUpdateMessage m = (AttributeUpdateMessage)msg;
session.superSetAttribute(m.getName(), m.getValue(), true);
}
}
} catch (IOException e) {
log.error("Can't handle topic message", e);
}
}
});
}
setState(LifecycleState.STARTING);
}
......
......@@ -17,6 +17,31 @@ import org.redisson.config.Config;
public class RedissonSessionManagerTest {
@Test
public void testUpdateTwoServers() throws Exception {
TomcatServer server1 = new TomcatServer("myapp", 8080, "src/test/");
server1.start();
Executor executor = Executor.newInstance();
BasicCookieStore cookieStore = new BasicCookieStore();
executor.use(cookieStore);
write(executor, "test", "1234");
TomcatServer server2 = new TomcatServer("myapp", 8081, "src/test/");
server2.start();
read(8081, executor, "test", "1234");
read(executor, "test", "1234");
write(executor, "test", "324");
read(8081, executor, "test", "324");
Executor.closeIdleConnections();
server1.stop();
server2.stop();
}
@Test
public void testExpiration() throws Exception {
TomcatServer server1 = new TomcatServer("myapp", 8080, "src/test/");
......
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeMessage {
private String sessionId;
public AttributeMessage() {
}
public AttributeMessage(String sessionId) {
this.sessionId = sessionId;
}
public String getSessionId() {
return sessionId;
}
}
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeRemoveMessage extends AttributeMessage {
private String name;
public AttributeRemoveMessage() {
super();
}
public AttributeRemoveMessage(String sessionId, String name) {
super(sessionId);
this.name = name;
}
public String getName() {
return name;
}
}
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeUpdateMessage extends AttributeMessage {
private String name;
private Object value;
public AttributeUpdateMessage() {
}
public AttributeUpdateMessage(String sessionId, String name, Object value) {
super(sessionId);
this.name = name;
this.value = value;
}
public String getName() {
return name;
}
public Object getValue() {
return value;
}
}
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesClearMessage extends AttributeMessage {
public AttributesClearMessage() {
}
public AttributesClearMessage(String sessionId) {
super(sessionId);
}
}
/**
* Copyright 2018 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.tomcat;
import java.util.Map;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesPutAllMessage extends AttributeMessage {
private Map<String, Object> attrs;
public AttributesPutAllMessage() {
}
public AttributesPutAllMessage(String sessionId, Map<String, Object> attrs) {
super(sessionId);
this.attrs = attrs;
}
public Map<String, Object> getAttrs() {
return attrs;
}
}
......@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.catalina.session.StandardSession;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.tomcat.RedissonSessionManager.ReadMode;
import org.redisson.tomcat.RedissonSessionManager.UpdateMode;
......@@ -37,6 +38,7 @@ public class RedissonSession extends StandardSession {
private final RedissonSessionManager redissonManager;
private final Map<String, Object> attrs;
private RMap<String, Object> map;
private RTopic<AttributeMessage> topic;
private final RedissonSessionManager.ReadMode readMode;
private final UpdateMode updateMode;
......@@ -69,10 +71,14 @@ public class RedissonSession extends StandardSession {
public void setId(String id, boolean notify) {
super.setId(id, notify);
map = redissonManager.getMap(id);
topic = redissonManager.getTopic();
}
public void delete() {
map.delete();
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesClearMessage(getId()));
}
map = null;
}
......@@ -86,6 +92,9 @@ public class RedissonSession extends StandardSession {
newMap.put("session:lastAccessedTime", lastAccessedTime);
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
}
}
......@@ -98,6 +107,9 @@ public class RedissonSession extends StandardSession {
newMap.put("session:lastAccessedTime", lastAccessedTime);
newMap.put("session:thisAccessedTime", thisAccessedTime);
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
if (getMaxInactiveInterval() >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
......@@ -109,12 +121,19 @@ public class RedissonSession extends StandardSession {
super.setMaxInactiveInterval(interval);
if (map != null) {
map.fastPut("session:maxInactiveInterval", maxInactiveInterval);
fastPut("session:maxInactiveInterval", maxInactiveInterval);
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
}
}
}
private void fastPut(String name, Object value) {
map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributeUpdateMessage(getId(), name, value));
}
}
@Override
public void setValid(boolean isValid) {
......@@ -125,7 +144,7 @@ public class RedissonSession extends StandardSession {
return;
}
map.fastPut("session:isValid", isValid);
fastPut("session:isValid", isValid);
}
}
......@@ -134,7 +153,7 @@ public class RedissonSession extends StandardSession {
super.setNew(isNew);
if (map != null) {
map.fastPut("session:isNew", isNew);
fastPut("session:isNew", isNew);
}
}
......@@ -144,25 +163,36 @@ public class RedissonSession extends StandardSession {
super.endAccess();
if (isNew != oldValue) {
map.fastPut("session:isNew", isNew);
fastPut("session:isNew", isNew);
}
}
public void superSetAttribute(String name, Object value, boolean notify) {
super.setAttribute(name, value, notify);
}
@Override
public void setAttribute(String name, Object value, boolean notify) {
super.setAttribute(name, value, notify);
if (updateMode == UpdateMode.DEFAULT && map != null && value != null) {
map.fastPut(name, value);
fastPut(name, value);
}
}
public void superRemoveAttributeInternal(String name, boolean notify) {
super.removeAttributeInternal(name, notify);
}
@Override
protected void removeAttributeInternal(String name, boolean notify) {
super.removeAttributeInternal(name, notify);
if (updateMode == UpdateMode.DEFAULT && map != null) {
map.fastRemove(name);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributeRemoveMessage(getId(), name));
}
}
}
......@@ -182,6 +212,9 @@ public class RedissonSession extends StandardSession {
}
map.putAll(newMap);
if (readMode == ReadMode.MEMORY) {
topic.publish(new AttributesPutAllMessage(getId(), newMap));
}
if (maxInactiveInterval >= 0) {
map.expire(getMaxInactiveInterval(), TimeUnit.SECONDS);
......@@ -215,7 +248,7 @@ public class RedissonSession extends StandardSession {
}
for (Entry<String, Object> entry : attrs.entrySet()) {
setAttribute(entry.getKey(), entry.getValue(), false);
super.setAttribute(entry.getKey(), entry.getValue(), false);
}
}
......
......@@ -18,6 +18,7 @@ package org.redisson.tomcat;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import javax.servlet.http.HttpSession;
......@@ -29,7 +30,9 @@ import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.redisson.Redisson;
import org.redisson.api.RMap;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
......@@ -120,7 +123,12 @@ public class RedissonSessionManager extends ManagerBase {
public RMap<String, Object> getMap(String sessionId) {
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
return redisson.getMap(keyPrefix + separator + "redisson_tomcat_session:" + sessionId);
final String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name);
}
public RTopic<AttributeMessage> getTopic() {
return redisson.getTopic("redisson:tomcat_session_updates");
}
@Override
......@@ -142,7 +150,7 @@ public class RedissonSessionManager extends ManagerBase {
session.endAccess();
return session;
}
result.access();
result.endAccess();
......@@ -176,6 +184,43 @@ public class RedissonSessionManager extends ManagerBase {
getEngine().getPipeline().addValve(new UpdateValve(this));
}
if (readMode == ReadMode.MEMORY) {
RTopic<AttributeMessage> updatesTopic = getTopic();
updatesTopic.addListener(new MessageListener<AttributeMessage>() {
@Override
public void onMessage(String channel, AttributeMessage msg) {
try {
// TODO make it thread-safe
RedissonSession session = (RedissonSession) RedissonSessionManager.super.findSession(msg.getSessionId());
if (session != null) {
if (msg instanceof AttributeRemoveMessage) {
session.superRemoveAttributeInternal(((AttributeRemoveMessage)msg).getName(), true);
}
if (msg instanceof AttributesClearMessage) {
RedissonSessionManager.super.remove(session, false);
}
if (msg instanceof AttributesPutAllMessage) {
AttributesPutAllMessage m = (AttributesPutAllMessage) msg;
for (Entry<String, Object> entry : m.getAttrs().entrySet()) {
session.superSetAttribute(entry.getKey(), entry.getValue(), true);
}
}
if (msg instanceof AttributeUpdateMessage) {
AttributeUpdateMessage m = (AttributeUpdateMessage)msg;
session.superSetAttribute(m.getName(), m.getValue(), true);
}
}
} catch (IOException e) {
log.error("Can't handle topic message", e);
}
}
});
}
setState(LifecycleState.STARTING);
}
......
......@@ -17,6 +17,31 @@ import org.redisson.config.Config;
public class RedissonSessionManagerTest {
@Test
public void testUpdateTwoServers() throws Exception {
TomcatServer server1 = new TomcatServer("myapp", 8080, "src/test/");
server1.start();
Executor executor = Executor.newInstance();
BasicCookieStore cookieStore = new BasicCookieStore();
executor.use(cookieStore);
write(executor, "test", "1234");
TomcatServer server2 = new TomcatServer("myapp", 8081, "src/test/");
server2.start();
read(8081, executor, "test", "1234");
read(executor, "test", "1234");
write(executor, "test", "324");
read(8081, executor, "test", "324");
Executor.closeIdleConnections();
server1.stop();
server2.stop();
}
@Test
public void testExpiration() throws Exception {
TomcatServer server1 = new TomcatServer("myapp", 8080, "src/test/");
......@@ -44,7 +69,7 @@ public class RedissonSessionManagerTest {
server1.stop();
server2.stop();
}
@Test
public void testSwitchServer() throws Exception {
// start the server at http://localhost:8080/myapp
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册