未验证 提交 5be3e933 编写于 作者: A Andrew Lyu 提交者: GitHub

Merge branch 'master' into fix-readme

......@@ -4,6 +4,25 @@ Redisson Releases History
Сonsider __[Redisson PRO](https://redisson.pro)__ version for advanced features and support by SLA.
### 29-Apr-2019 - versions 3.10.7 released
Feature - Add support for [Reactive and RxJava2 interfaces](https://github.com/redisson/redisson/wiki/9.-distributed-services#913-remote-service-asynchronous-reactive-and-rxjava2-calls) to RemoteService object
Feature - MILLISECONDS option added to RRateLimiter.RateIntervalUnit object
Feature - range method added to RList, RListReactive and RListRx interfaces
Improvement - `JCache.getAll` execution optimization for non-existing keys
Improvement - 10X Performance boost for `JCache.putAll` method
Fixed - disconnected sentinels shouldn't be used in sentinel list
Fixed - Apache Tomcat `RedissonSessionManager` doesn't use classloader aware codec for session Map object (thanks to [jchobantonov](https://github.com/jchobantonov))
Fixed - LiveObject field with Map type couldn't be persisted
Fixed - `RRateLimiter` allows permits limit exceeding
Fixed - `CompositeCodec.getMapValueDecoder` method uses `MapKeyDecoder` instead of `MapValueDecoder`
Fixed - memory leak during blocking methods invocation of Queue objects
Fixed - Apache Tomcat `RedissonSessionManager.findSession` shouldn't create a new one session (thanks to [jchobantonov](https://github.com/jchobantonov))
Fixed - `JCache.removeAll` method doesn't notify Cache listeners
Fixed - `UpdateValve` sould be removed from pipeline in Apache Tomcat `RedissonSessionManager.stopInternal` method (thanks to [jchobantonov](https://github.com/jchobantonov))
Fixed - Redis Sentinel prior 5.0.1 version doesn't require password. Regression since 3.10.5 version
Fixed - Redisson tries to renewed Lock expiration even if lock doesn't exist. Regression since 3.10.5 version
Fixed - FstCodec can't deserialize ConcurrentHashMap based object with package visibility
### 05-Apr-2019 - versions 3.10.6 released
Feature - `broadcastSessionEvents` setting added to Tomcat Session Manager
Feature - `remainTimeToLive` method added to `RLock`, `RLockAsync`, `RLockRx` and `RLockReactive` interfaces
......
# Redisson - Redis Java client<br/>with features of In-Memory Data Grid
[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.10.5) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Report an issue](https://github.com/redisson/redisson/issues/new) | **[Redisson PRO](https://redisson.pro)**
[Quick start](https://github.com/redisson/redisson#quick-start) | [Documentation](https://github.com/redisson/redisson/wiki) | [Javadocs](http://www.javadoc.io/doc/org.redisson/redisson/3.10.6) | [Changelog](https://github.com/redisson/redisson/blob/master/CHANGELOG.md) | [Code examples](https://github.com/redisson/redisson-examples) | [FAQs](https://github.com/redisson/redisson/wiki/16.-FAQ) | [Report an issue](https://github.com/redisson/redisson/issues/new) | **[Redisson PRO](https://redisson.pro)**
Based on high-performance async and lock-free Java Redis client and [Netty](http://netty.io) framework.
JDK compatibility: 1.8 - 12, Android
......@@ -103,12 +103,12 @@ Used by
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.10.6</version>
<version>3.10.7</version>
</dependency>
#### Gradle
compile 'org.redisson:redisson:3.10.6'
compile 'org.redisson:redisson:3.10.7'
#### Java
......@@ -134,8 +134,8 @@ Consider __[Redisson PRO](https://redisson.pro)__ version for advanced features
## Downloads
[Redisson 3.10.6](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.10.6&e=jar),
[Redisson node 3.10.6](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.10.6&e=jar)
[Redisson 3.10.7](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson&v=3.10.7&e=jar),
[Redisson node 3.10.7](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.10.7&e=jar)
## FAQs
......
......@@ -3,7 +3,7 @@
<groupId>org.redisson</groupId>
<artifactId>redisson-parent</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Redisson</name>
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-parent</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-parent</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-hibernate</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-hibernate</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-hibernate</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-hibernate</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-parent</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-parent</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......
......@@ -18,7 +18,8 @@ Add `RedissonSessionManager` into `tomcat/conf/context.xml`
```xml
<Manager className="org.redisson.tomcat.RedissonSessionManager"
configPath="${catalina.base}/redisson.conf" readMode="REDIS" updateMode="DEFAULT"/>
configPath="${catalina.base}/redisson.conf"
readMode="REDIS" updateMode="DEFAULT" broadcastSessionEvents="false"/>
```
`readMode` - read Session attributes mode. Two modes are available:
* `MEMORY` - stores attributes into local Tomcat Session and Redis. Further Session updates propagated to local Tomcat Session using Redis-based events.
......@@ -64,14 +65,15 @@ Add `RedissonSessionManager` into `tomcat/conf/context.xml`
### 2. Copy two jars into `TOMCAT_BASE/lib` directory:
[redisson-all-3.10.6.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.10.6&e=jar)
for Tomcat 6.x
[redisson-tomcat-6-3.10.6.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=3.10.6&e=jar)
for Tomcat 7.x
[redisson-tomcat-7-3.10.6.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=3.10.6&e=jar)
for Tomcat 8.x
[redisson-tomcat-8-3.10.6.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=3.10.6&e=jar)
for Tomcat 9.x
[redisson-tomcat-9-3.10.6.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-9&v=3.10.6&e=jar)
[redisson-all-3.10.7.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-all&v=3.10.7&e=jar)
for Tomcat 6.x
[redisson-tomcat-6-3.10.7.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-6&v=3.10.7&e=jar)
for Tomcat 7.x
[redisson-tomcat-7-3.10.7.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-7&v=3.10.7&e=jar)
for Tomcat 8.x
[redisson-tomcat-8-3.10.7.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-8&v=3.10.7&e=jar)
for Tomcat 9.x
[redisson-tomcat-9-3.10.7.jar](https://repository.sonatype.org/service/local/artifact/maven/redirect?r=central-proxy&g=org.redisson&a=redisson-tomcat-9&v=3.10.7&e=jar)
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-parent</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-tomcat</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......
......@@ -28,6 +28,7 @@ import org.apache.catalina.Context;
import org.apache.catalina.Lifecycle;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.LifecycleListener;
import org.apache.catalina.Pipeline;
import org.apache.catalina.Session;
import org.apache.catalina.SessionEvent;
import org.apache.catalina.SessionListener;
......@@ -69,6 +70,10 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
private final String nodeId = UUID.randomUUID().toString();
private UpdateValve updateValve;
private Codec codecToUse;
public String getNodeId() { return nodeId; }
public String getUpdateMode() {
......@@ -164,7 +169,7 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
public RMap<String, Object> getMap(String sessionId) {
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name, new CompositeCodec(StringCodec.INSTANCE, redisson.getConfig().getCodec()));
return redisson.getMap(name, new CompositeCodec(StringCodec.INSTANCE, codecToUse, codecToUse));
}
public RTopic getTopic() {
......@@ -184,6 +189,11 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
} catch (Exception e) {
log.error("Can't read session object by id: " + id, e);
}
if (attrs.isEmpty()) {
log.info("Session " + id + " can't be found");
return null;
}
RedissonSession session = (RedissonSession) createEmptySession();
session.load(attrs);
......@@ -231,14 +241,15 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
redisson = buildClient();
final ClassLoader applicationClassLoader;
if (Thread.currentThread().getContextClassLoader() != null) {
if (getContainer().getLoader().getClassLoader() != null) {
applicationClassLoader = getContainer().getLoader().getClassLoader();
} else if (Thread.currentThread().getContextClassLoader() != null) {
applicationClassLoader = Thread.currentThread().getContextClassLoader();
} else {
applicationClassLoader = getClass().getClassLoader();
}
Codec codec = redisson.getConfig().getCodec();
Codec codecToUse;
try {
codecToUse = codec.getClass()
.getConstructor(ClassLoader.class, codec.getClass())
......@@ -248,7 +259,12 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
}
if (updateMode == UpdateMode.AFTER_REQUEST) {
getEngine().getPipeline().addValve(new UpdateValve(this));
Pipeline pipeline = getEngine().getPipeline();
if (updateValve != null) { // in case startInternal is called without stopInternal cleaning the updateValve
pipeline.removeValve(updateValve);
}
updateValve = new UpdateValve(this);
pipeline.addValve(updateValve);
}
if (readMode == ReadMode.MEMORY || broadcastSessionEvents) {
......@@ -330,6 +346,13 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
@Override
public void stop() throws LifecycleException {
if (updateValve != null) {
getEngine().getPipeline().removeValve(updateValve);
updateValve = null;
}
codecToUse = null;
try {
shutdownRedisson();
} catch (Exception e) {
......
......@@ -43,7 +43,14 @@ public class UpdateValve extends ValveBase {
try {
getNext().invoke(request, response);
} finally {
manager.store(request.getSession(false));
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
try {
ClassLoader applicationClassLoader = request.getContext().getLoader().getClassLoader();
Thread.currentThread().setContextClassLoader(applicationClassLoader);
manager.store(request.getSession(false));
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}
}
}
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-tomcat</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......@@ -17,25 +17,25 @@
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>7.0.91</version>
<version>7.0.94</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-logging-juli</artifactId>
<version>7.0.91</version>
<version>7.0.94</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-jasper</artifactId>
<version>7.0.91</version>
<version>7.0.94</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>tomcat-jasper</artifactId>
<version>7.0.91</version>
<version>7.0.94</version>
<scope>provided</scope>
</dependency>
</dependencies>
......
......@@ -27,6 +27,7 @@ import javax.servlet.http.HttpSession;
import org.apache.catalina.Context;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.LifecycleState;
import org.apache.catalina.Pipeline;
import org.apache.catalina.Session;
import org.apache.catalina.SessionEvent;
import org.apache.catalina.SessionListener;
......@@ -67,6 +68,10 @@ public class RedissonSessionManager extends ManagerBase {
private final String nodeId = UUID.randomUUID().toString();
private UpdateValve updateValve;
private Codec codecToUse;
public String getNodeId() { return nodeId; }
public String getUpdateMode() {
......@@ -143,7 +148,7 @@ public class RedissonSessionManager extends ManagerBase {
public RMap<String, Object> getMap(String sessionId) {
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name, new CompositeCodec(StringCodec.INSTANCE, redisson.getConfig().getCodec()));
return redisson.getMap(name, new CompositeCodec(StringCodec.INSTANCE, codecToUse, codecToUse));
}
public RTopic getTopic() {
......@@ -163,6 +168,11 @@ public class RedissonSessionManager extends ManagerBase {
} catch (Exception e) {
log.error("Can't read session object by id: " + id, e);
}
if (attrs.isEmpty()) {
log.info("Session " + id + " can't be found");
return null;
}
RedissonSession session = (RedissonSession) createEmptySession();
session.load(attrs);
......@@ -211,14 +221,15 @@ public class RedissonSessionManager extends ManagerBase {
redisson = buildClient();
final ClassLoader applicationClassLoader;
if (Thread.currentThread().getContextClassLoader() != null) {
if (getContainer().getLoader().getClassLoader() != null) {
applicationClassLoader = getContainer().getLoader().getClassLoader();
} else if (Thread.currentThread().getContextClassLoader() != null) {
applicationClassLoader = Thread.currentThread().getContextClassLoader();
} else {
applicationClassLoader = getClass().getClassLoader();
}
Codec codec = redisson.getConfig().getCodec();
Codec codecToUse;
try {
codecToUse = codec.getClass()
.getConstructor(ClassLoader.class, codec.getClass())
......@@ -228,7 +239,12 @@ public class RedissonSessionManager extends ManagerBase {
}
if (updateMode == UpdateMode.AFTER_REQUEST) {
getEngine().getPipeline().addValve(new UpdateValve(this));
Pipeline pipeline = getEngine().getPipeline();
if (updateValve != null) { // in case startInternal is called without stopInternal cleaning the updateValve
pipeline.removeValve(updateValve);
}
updateValve = new UpdateValve(this);
pipeline.addValve(updateValve);
}
if (readMode == ReadMode.MEMORY || broadcastSessionEvents) {
......@@ -314,6 +330,13 @@ public class RedissonSessionManager extends ManagerBase {
setState(LifecycleState.STOPPING);
if (updateValve != null) {
getEngine().getPipeline().removeValve(updateValve);
updateValve = null;
}
codecToUse = null;
try {
shutdownRedisson();
} catch (Exception e) {
......
......@@ -43,7 +43,14 @@ public class UpdateValve extends ValveBase {
try {
getNext().invoke(request, response);
} finally {
manager.store(request.getSession(false));
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
try {
ClassLoader applicationClassLoader = request.getContext().getLoader().getClassLoader();
Thread.currentThread().setContextClassLoader(applicationClassLoader);
manager.store(request.getSession(false));
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}
}
}
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-tomcat</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......@@ -17,19 +17,19 @@
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>8.5.34</version>
<version>[8.5.40,)</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-jasper</artifactId>
<version>8.5.34</version>
<version>[8.5.40,)</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>tomcat-jasper</artifactId>
<version>8.5.34</version>
<version>[8.5.40,)</version>
<scope>provided</scope>
</dependency>
</dependencies>
......
......@@ -26,6 +26,7 @@ import javax.servlet.http.HttpSession;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.LifecycleState;
import org.apache.catalina.Pipeline;
import org.apache.catalina.Session;
import org.apache.catalina.SessionEvent;
import org.apache.catalina.SessionListener;
......@@ -66,6 +67,10 @@ public class RedissonSessionManager extends ManagerBase {
private final String nodeId = UUID.randomUUID().toString();
private UpdateValve updateValve;
private Codec codecToUse;
public String getNodeId() { return nodeId; }
public String getUpdateMode() {
......@@ -142,7 +147,7 @@ public class RedissonSessionManager extends ManagerBase {
public RMap<String, Object> getMap(String sessionId) {
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name, new CompositeCodec(StringCodec.INSTANCE, redisson.getConfig().getCodec()));
return redisson.getMap(name, new CompositeCodec(StringCodec.INSTANCE, codecToUse, codecToUse));
}
public RTopic getTopic() {
......@@ -162,7 +167,12 @@ public class RedissonSessionManager extends ManagerBase {
} catch (Exception e) {
log.error("Can't read session object by id: " + id, e);
}
if (attrs.isEmpty()) {
log.info("Session " + id + " can't be found");
return null;
}
RedissonSession session = (RedissonSession) createEmptySession();
session.load(attrs);
session.setId(id);
......@@ -210,14 +220,15 @@ public class RedissonSessionManager extends ManagerBase {
redisson = buildClient();
final ClassLoader applicationClassLoader;
if (Thread.currentThread().getContextClassLoader() != null) {
if (getContext().getLoader().getClassLoader() != null) {
applicationClassLoader = getContext().getLoader().getClassLoader();
} else if (Thread.currentThread().getContextClassLoader() != null) {
applicationClassLoader = Thread.currentThread().getContextClassLoader();
} else {
applicationClassLoader = getClass().getClassLoader();
}
Codec codec = redisson.getConfig().getCodec();
Codec codecToUse;
try {
codecToUse = codec.getClass()
.getConstructor(ClassLoader.class, codec.getClass())
......@@ -227,7 +238,12 @@ public class RedissonSessionManager extends ManagerBase {
}
if (updateMode == UpdateMode.AFTER_REQUEST) {
getEngine().getPipeline().addValve(new UpdateValve(this));
Pipeline pipeline = getEngine().getPipeline();
if (updateValve != null) { // in case startInternal is called without stopInternal cleaning the updateValve
pipeline.removeValve(updateValve);
}
updateValve = new UpdateValve(this);
pipeline.addValve(updateValve);
}
if (readMode == ReadMode.MEMORY || broadcastSessionEvents) {
......@@ -313,6 +329,13 @@ public class RedissonSessionManager extends ManagerBase {
setState(LifecycleState.STOPPING);
if (updateValve != null) {
getEngine().getPipeline().removeValve(updateValve);
updateValve = null;
}
codecToUse = null;
try {
shutdownRedisson();
} catch (Exception e) {
......
......@@ -43,7 +43,14 @@ public class UpdateValve extends ValveBase {
try {
getNext().invoke(request, response);
} finally {
manager.store(request.getSession(false));
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
try {
ClassLoader applicationClassLoader = request.getContext().getLoader().getClassLoader();
Thread.currentThread().setContextClassLoader(applicationClassLoader);
manager.store(request.getSession(false));
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}
}
}
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-tomcat</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......@@ -17,19 +17,19 @@
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>9.0.12</version>
<version>[9.0.19,)</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-jasper</artifactId>
<version>9.0.12</version>
<version>[9.0.19,)</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>tomcat-jasper</artifactId>
<version>9.0.12</version>
<version>[9.0.19,)</version>
<scope>provided</scope>
</dependency>
</dependencies>
......
......@@ -26,6 +26,7 @@ import javax.servlet.http.HttpSession;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.LifecycleState;
import org.apache.catalina.Pipeline;
import org.apache.catalina.Session;
import org.apache.catalina.SessionEvent;
import org.apache.catalina.SessionListener;
......@@ -66,6 +67,10 @@ public class RedissonSessionManager extends ManagerBase {
private final String nodeId = UUID.randomUUID().toString();
private UpdateValve updateValve;
private Codec codecToUse;
public String getNodeId() { return nodeId; }
public String getUpdateMode() {
......@@ -142,7 +147,7 @@ public class RedissonSessionManager extends ManagerBase {
public RMap<String, Object> getMap(String sessionId) {
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
String name = keyPrefix + separator + "redisson:tomcat_session:" + sessionId;
return redisson.getMap(name, new CompositeCodec(StringCodec.INSTANCE, redisson.getConfig().getCodec()));
return redisson.getMap(name, new CompositeCodec(StringCodec.INSTANCE, codecToUse, codecToUse));
}
public RTopic getTopic() {
......@@ -162,6 +167,11 @@ public class RedissonSessionManager extends ManagerBase {
} catch (Exception e) {
log.error("Can't read session object by id: " + id, e);
}
if (attrs.isEmpty()) {
log.info("Session " + id + " can't be found");
return null;
}
RedissonSession session = (RedissonSession) createEmptySession();
session.load(attrs);
......@@ -210,14 +220,15 @@ public class RedissonSessionManager extends ManagerBase {
redisson = buildClient();
final ClassLoader applicationClassLoader;
if (Thread.currentThread().getContextClassLoader() != null) {
if (getContext().getLoader().getClassLoader() != null) {
applicationClassLoader = getContext().getLoader().getClassLoader();
} else if (Thread.currentThread().getContextClassLoader() != null) {
applicationClassLoader = Thread.currentThread().getContextClassLoader();
} else {
applicationClassLoader = getClass().getClassLoader();
}
Codec codec = redisson.getConfig().getCodec();
Codec codecToUse;
try {
codecToUse = codec.getClass()
.getConstructor(ClassLoader.class, codec.getClass())
......@@ -227,7 +238,12 @@ public class RedissonSessionManager extends ManagerBase {
}
if (updateMode == UpdateMode.AFTER_REQUEST) {
getEngine().getPipeline().addValve(new UpdateValve(this));
Pipeline pipeline = getEngine().getPipeline();
if (updateValve != null) { // in case startInternal is called without stopInternal cleaning the updateValve
pipeline.removeValve(updateValve);
}
updateValve = new UpdateValve(this);
pipeline.addValve(updateValve);
}
if (readMode == ReadMode.MEMORY || broadcastSessionEvents) {
......@@ -313,6 +329,13 @@ public class RedissonSessionManager extends ManagerBase {
setState(LifecycleState.STOPPING);
if (updateValve != null) {
getEngine().getPipeline().removeValve(updateValve);
updateValve = null;
}
codecToUse = null;
try {
shutdownRedisson();
} catch (Exception e) {
......
......@@ -43,7 +43,14 @@ public class UpdateValve extends ValveBase {
try {
getNext().invoke(request, response);
} finally {
manager.store(request.getSession(false));
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
try {
ClassLoader applicationClassLoader = request.getContext().getLoader().getClassLoader();
Thread.currentThread().setContextClassLoader(applicationClassLoader);
manager.store(request.getSession(false));
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
}
}
}
......
......@@ -230,7 +230,7 @@
</module>
<module name="SuppressionFilter">
<property name="file" value="suppressions.xml"/>
<property name="file" value="${checkstyle.config.path}/suppressions.xml"/>
<property name="optional" value="false"/>
</module>
......
......@@ -4,7 +4,7 @@
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-parent</artifactId>
<version>3.10.7-SNAPSHOT</version>
<version>3.10.8-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
......@@ -98,7 +98,13 @@
<dependency>
<groupId>org.jmockit</groupId>
<artifactId>jmockit</artifactId>
<version>1.41</version>
<version>1.46</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jooq</groupId>
<artifactId>joor-java-8</artifactId>
<version>0.9.11</version>
<scope>test</scope>
</dependency>
<dependency>
......@@ -117,19 +123,19 @@
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>8.5.34</version>
<version>[8.5.40,)</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-jasper</artifactId>
<version>8.5.34</version>
<version>[8.5.40,)</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>tomcat-jasper</artifactId>
<version>8.5.34</version>
<version>[8.5.40,)</version>
<scope>test</scope>
</dependency>
<dependency>
......@@ -357,6 +363,7 @@
<consoleOutput>true</consoleOutput>
<enableRSS>false</enableRSS>
<configLocation>/checkstyle.xml</configLocation>
<propertyExpansion>checkstyle.config.path=${basedir}</propertyExpansion>
</configuration>
<dependencies>
<dependency>
......
......@@ -480,7 +480,7 @@ public class Redisson implements RedissonClient {
} else {
executorId = connectionManager.getId() + ":" + name;
}
return new RedissonRemoteService(codec, this, name, connectionManager.getCommandExecutor(), executorId, responses);
return new RedissonRemoteService(codec, name, connectionManager.getCommandExecutor(), executorId, responses);
}
@Override
......
......@@ -102,8 +102,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private static final Logger LOGGER = LoggerFactory.getLogger(RedissonExecutorService.class);
@SuppressWarnings("StaticVariableName")
private static RemoteInvocationOptions RESULT_OPTIONS = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.HOURS);
private static final RemoteInvocationOptions RESULT_OPTIONS = RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.HOURS);
public static final int SHUTDOWN_STATE = 1;
public static final int TERMINATED_STATE = 2;
......@@ -166,7 +165,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
this.executorId = connectionManager.getId().toString() + ":" + RemoteExecutorServiceAsync.class.getName() + ":" + name;
}
remoteService = new RedissonExecutorRemoteService(codec, redisson, name, connectionManager.getCommandExecutor(), executorId, responses);
remoteService = new RedissonExecutorRemoteService(codec, name, connectionManager.getCommandExecutor(), executorId, responses);
requestQueueName = ((RedissonRemoteService) remoteService).getRequestQueueName(RemoteExecutorService.class);
responseQueueName = ((RedissonRemoteService) remoteService).getResponseQueueName(executorId);
String objectName = requestQueueName;
......@@ -185,7 +184,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
workersTopic = redisson.getTopic(workersChannelName);
executorRemoteService = new TasksService(codec, redisson, name, commandExecutor, executorId, responses);
executorRemoteService = new TasksService(codec, name, commandExecutor, executorId, responses);
executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
executorRemoteService.setTasksCounterName(tasksCounterName);
executorRemoteService.setStatusName(statusName);
......@@ -197,7 +196,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
scheduledRemoteService = new ScheduledTasksService(codec, redisson, name, commandExecutor, executorId, responses);
scheduledRemoteService = new ScheduledTasksService(codec, name, commandExecutor, executorId, responses);
scheduledRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
scheduledRemoteService.setTasksCounterName(tasksCounterName);
scheduledRemoteService.setStatusName(statusName);
......@@ -321,10 +320,8 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public void execute(Runnable task) {
check(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>) asyncServiceWithoutResult.executeRunnable(
new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
createTaskParameters(task));
syncExecute(promise);
}
......@@ -338,9 +335,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
RemoteExecutorServiceAsync asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
for (Runnable task : tasks) {
check(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
asyncServiceWithoutResult.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
asyncServiceWithoutResult.executeRunnable(createTaskParameters(task));
}
List<Boolean> result = (List<Boolean>) executorRemoteService.executeAdd();
......@@ -350,7 +345,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
}
private TasksBatchService createBatchService() {
TasksBatchService executorRemoteService = new TasksBatchService(codec, redisson, name, commandExecutor, executorId, responses);
TasksBatchService executorRemoteService = new TasksBatchService(codec, name, commandExecutor, executorId, responses);
executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
executorRemoteService.setTasksCounterName(tasksCounterName);
executorRemoteService.setStatusName(statusName);
......@@ -567,9 +562,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public <T> RExecutorFuture<T> submitAsync(Callable<T> task) {
check(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(createTaskParameters(task));
addListener(result);
return createFuture(result);
}
......@@ -585,9 +578,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
for (Callable<?> task : tasks) {
check(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>) asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RemotePromise<?> promise = (RemotePromise<?>) asyncService.executeCallable(createTaskParameters(task));
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise);
result.add(executorFuture);
}
......@@ -599,7 +590,19 @@ public class RedissonExecutorService implements RScheduledExecutorService {
return new RedissonExecutorBatchFuture(result);
}
protected TaskParameters createTaskParameters(Callable<?> task) {
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
return new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state);
}
protected TaskParameters createTaskParameters(Runnable task) {
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
return new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state);
}
@Override
public RExecutorBatchFuture submitAsync(Callable<?>...tasks) {
if (tasks.length == 0) {
......@@ -611,9 +614,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
List<RExecutorFuture<?>> result = new ArrayList<>();
for (Callable<?> task : tasks) {
check(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>) asyncService.executeCallable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RemotePromise<?> promise = (RemotePromise<?>) asyncService.executeCallable(createTaskParameters(task));
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise);
result.add(executorFuture);
}
......@@ -701,9 +702,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
RemoteExecutorServiceAsync asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RESULT_OPTIONS);
for (Runnable task : tasks) {
check(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>) asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RemotePromise<Void> promise = (RemotePromise<Void>) asyncService.executeRunnable(createTaskParameters(task));
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise);
result.add(executorFuture);
}
......@@ -727,9 +726,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
List<RExecutorFuture<?>> result = new ArrayList<>();
for (Runnable task : tasks) {
check(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>) asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RemotePromise<Void> promise = (RemotePromise<Void>) asyncService.executeRunnable(createTaskParameters(task));
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise);
result.add(executorFuture);
}
......@@ -767,9 +764,7 @@ public class RedissonExecutorService implements RScheduledExecutorService {
@Override
public RExecutorFuture<?> submitAsync(Runnable task) {
check(task);
ClassBody classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(new TaskParameters(classBody.getClazzName(), classBody.getClazz(), classBody.getLambda(), state));
RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(createTaskParameters(task));
addListener(result);
return createFuture(result);
}
......
......@@ -915,4 +915,24 @@ public class RedissonList<V> extends RedissonExpirable implements RList<V> {
return commandExecutor.readAsync(getName(), codec, RedisCommands.SORT_LIST, params.toArray());
}
@Override
public RFuture<List<V>> rangeAsync(int toIndex) {
return rangeAsync(0, toIndex);
}
@Override
public RFuture<List<V>> rangeAsync(int fromIndex, int toIndex) {
return commandExecutor.readAsync(getName(), codec, LRANGE, getName(), fromIndex, toIndex);
}
@Override
public List<V> range(int toIndex) {
return get(rangeAsync(toIndex));
}
@Override
public List<V> range(int fromIndex, int toIndex) {
return get(rangeAsync(fromIndex, toIndex));
}
}
......@@ -164,18 +164,7 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
@Override
public RFuture<List<V>> readAllAsync() {
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_MAP_VALUE_LIST,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "return {};"
+ "end; "
+ "return redis.call('lrange', KEYS[2], 0, -1);",
Arrays.<Object>asList(timeoutSetName, getName()),
System.currentTimeMillis(), encodeMapKey(key));
return rangeAsync(0, -1);
}
@Override
......@@ -893,6 +882,37 @@ public class RedissonListMultimapValues<V> extends RedissonExpirable implements
return list.sortToAsync(destName, byPattern, getPatterns, order, offset, count);
}
@Override
public RFuture<List<V>> rangeAsync(int toIndex) {
return rangeAsync(0, toIndex);
}
@Override
public RFuture<List<V>> rangeAsync(int fromIndex, int toIndex) {
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_MAP_VALUE_LIST,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if expireDate <= tonumber(ARGV[1]) then "
+ "return {};"
+ "end; "
+ "return redis.call('lrange', KEYS[2], ARGV[3], ARGV[4]);",
Arrays.<Object>asList(timeoutSetName, getName()),
System.currentTimeMillis(), encodeMapKey(key), fromIndex, toIndex);
}
@Override
public List<V> range(int toIndex) {
return get(rangeAsync(toIndex));
}
@Override
public List<V> range(int fromIndex, int toIndex) {
return get(rangeAsync(fromIndex, toIndex));
}
}
......@@ -326,7 +326,7 @@ public class RedissonLiveObjectService implements RLiveObjectService {
}
} else if (rObject instanceof Map) {
Map<Object, Object> rMap = (Map<Object, Object>) rObject;
Map<?, ?> map = (Map<?, ?>) rObject;
Map<?, ?> map = (Map<?, ?>) object;
for (Entry<?, ?> entry : map.entrySet()) {
Object key = entry.getKey();
Object value = entry.getValue();
......
......@@ -33,7 +33,6 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy;
import org.redisson.api.LocalCachedMapOptions.SyncStrategy;
import org.redisson.api.RFuture;
......@@ -41,16 +40,12 @@ import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RedissonClient;
import org.redisson.cache.Cache;
import org.redisson.cache.CacheKey;
import org.redisson.cache.LFUCacheMap;
import org.redisson.cache.LRUCacheMap;
import org.redisson.cache.LocalCacheListener;
import org.redisson.cache.LocalCacheView;
import org.redisson.cache.LocalCachedMapClear;
import org.redisson.cache.LocalCachedMapInvalidate;
import org.redisson.cache.LocalCachedMapUpdate;
import org.redisson.cache.LocalCachedMessageCodec;
import org.redisson.cache.NoneCacheMap;
import org.redisson.cache.ReferenceCacheMap;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
......@@ -145,13 +140,9 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
private void init(String name, LocalCachedMapOptions<K, V> options, RedissonClient redisson, EvictionScheduler evictionScheduler) {
instanceId = generateId();
syncStrategy = options.getSyncStrategy();
cache = createCache(options);
listener = new LocalCacheListener(name, commandExecutor, cache, this, instanceId, codec, options, cacheUpdateLogTime) {
listener = new LocalCacheListener(name, commandExecutor, this, codec, options, cacheUpdateLogTime) {
@Override
protected void updateCache(ByteBuf keyBuf, ByteBuf valueBuf) throws IOException {
......@@ -162,7 +153,9 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
}
};
listener.add();
cache = listener.createCache(options);
instanceId = listener.generateId();
listener.add(cache);
localCacheView = new LocalCacheView(cache, this);
if (options.getSyncStrategy() != SyncStrategy.NONE) {
......@@ -182,25 +175,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
cache.put(cacheKey, new CacheValue(key, value));
}
protected Cache<CacheKey, CacheValue> createCache(LocalCachedMapOptions<K, V> options) {
if (options.getEvictionPolicy() == EvictionPolicy.NONE) {
return new NoneCacheMap<CacheKey, CacheValue>(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.LRU) {
return new LRUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.LFU) {
return new LFUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.SOFT) {
return ReferenceCacheMap.soft(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.WEAK) {
return ReferenceCacheMap.weak(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
throw new IllegalArgumentException("Invalid eviction policy: " + options.getEvictionPolicy());
}
public CacheKey toCacheKey(Object key) {
ByteBuf encoded = encodeMapKey(key);
try {
......@@ -259,12 +233,6 @@ public class RedissonLocalCachedMap<K, V> extends RedissonMap<K, V> implements R
return future;
}
protected static byte[] generateId() {
byte[] id = new byte[16];
ThreadLocalRandom.current().nextBytes(id);
return id;
}
protected static byte[] generateLogEntryId(byte[] keyHash) {
byte[] result = new byte[keyHash.length + 1 + 8];
result[16] = ':';
......
......@@ -284,8 +284,10 @@ public class RedissonLock extends RedissonExpirable implements RLock {
return;
}
// reschedule itself
renewExpiration();
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
......
......@@ -200,6 +200,7 @@ public class RedissonRateLimiter extends RedissonObject implements RRateLimiter
+ "return nil; "
+ "end; "
+ "else "
+ "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "
+ "redis.call('set', valueName, rate, 'px', interval); "
+ "redis.call('decrby', valueName, ARGV[1]); "
+ "return nil; "
......
......@@ -17,6 +17,8 @@ package org.redisson;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.redisson.api.BatchOptions;
import org.redisson.api.ClusterNode;
......@@ -48,6 +50,7 @@ import org.redisson.api.RPermitExpirableSemaphoreReactive;
import org.redisson.api.RQueueReactive;
import org.redisson.api.RRateLimiterReactive;
import org.redisson.api.RReadWriteLockReactive;
import org.redisson.api.RRemoteService;
import org.redisson.api.RScoredSortedSetReactive;
import org.redisson.api.RScriptReactive;
import org.redisson.api.RSemaphoreReactive;
......@@ -83,6 +86,7 @@ import org.redisson.reactive.RedissonSetMultimapReactive;
import org.redisson.reactive.RedissonSetReactive;
import org.redisson.reactive.RedissonTopicReactive;
import org.redisson.reactive.RedissonTransactionReactive;
import org.redisson.remote.ResponseEntry;
/**
* Main infrastructure class allows to get access
......@@ -98,6 +102,8 @@ public class RedissonReactive implements RedissonReactiveClient {
protected final CommandReactiveService commandExecutor;
protected final ConnectionManager connectionManager;
protected final Config config;
protected final ConcurrentMap<String, ResponseEntry> responses = new ConcurrentHashMap<>();
protected RedissonReactive(Config config) {
this.config = config;
......@@ -400,6 +406,32 @@ public class RedissonReactive implements RedissonReactiveClient {
public RAtomicDoubleReactive getAtomicDouble(String name) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonAtomicDouble(commandExecutor, name), RAtomicDoubleReactive.class);
}
@Override
public RRemoteService getRemoteService() {
return getRemoteService("redisson_rs", connectionManager.getCodec());
}
@Override
public RRemoteService getRemoteService(String name) {
return getRemoteService(name, connectionManager.getCodec());
}
@Override
public RRemoteService getRemoteService(Codec codec) {
return getRemoteService("redisson_rs", codec);
}
@Override
public RRemoteService getRemoteService(String name, Codec codec) {
String executorId;
if (codec == connectionManager.getCodec()) {
executorId = connectionManager.getId().toString();
} else {
executorId = connectionManager.getId() + ":" + name;
}
return new RedissonRemoteService(codec, name, commandExecutor, executorId, responses);
}
@Override
public RBitSetReactive getBitSet(String name) {
......
......@@ -31,13 +31,11 @@ import org.redisson.api.RFuture;
import org.redisson.api.RList;
import org.redisson.api.RMap;
import org.redisson.api.RRemoteService;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
......@@ -90,8 +88,8 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
private final Map<RemoteServiceKey, RemoteServiceMethod> beans = new ConcurrentHashMap<>();
private final Map<Class<?>, Entry> remoteMap = new ConcurrentHashMap<>();
public RedissonRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);
public RedissonRemoteService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, executorId, responses);
}
@Override
......@@ -150,6 +148,10 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
register(remoteInterface, object, workers, commandExecutor.getConnectionManager().getExecutor());
}
private <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueue<V>(codec, commandExecutor, name, null);
}
@Override
public <T> void register(Class<T> remoteInterface, T object, int workers, ExecutorService executor) {
if (workers < 1) {
......@@ -166,10 +168,10 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
remoteMap.put(remoteInterface, new Entry(workers));
String requestQueueName = getRequestQueueName(remoteInterface);
RBlockingQueue<String> requestQueue = redisson.getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
RBlockingQueue<String> requestQueue = getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
subscribe(remoteInterface, requestQueue, executor);
}
private <T> void subscribe(Class<T> remoteInterface, RBlockingQueue<String> requestQueue,
ExecutorService executor) {
Entry entry = remoteMap.get(remoteInterface);
......@@ -185,8 +187,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
if (e != null) {
if (e instanceof RedissonShutdownException
|| redisson.isShuttingDown()) {
if (e instanceof RedissonShutdownException) {
return;
}
log.error("Can't process the remote service request.", e);
......@@ -207,7 +208,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
subscribe(remoteInterface, requestQueue, executor);
}
RMap<String, RemoteServiceRequest> tasks = redisson.getMap(requestQueue.getName() + ":tasks", new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RMap<String, RemoteServiceRequest> tasks = getMap(requestQueue.getName() + ":tasks");
RFuture<RemoteServiceRequest> taskFuture = getTask(requestId, tasks);
taskFuture.onComplete((request, exc) -> {
if (exc != null) {
......@@ -277,7 +278,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
}
RList<Object> list = redisson.getList(responseName, codec);
RList<Object> list = new RedissonList<>(codec, commandExecutor, responseName, null);
RFuture<Boolean> addFuture = list.addAsync(new RemoteServiceAck(request.getId()));
addFuture.onComplete((res, exce) -> {
if (exce != null) {
......@@ -336,7 +337,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
// could be removed not from future object
if (r.isSendResponse()) {
RMap<String, RemoteServiceCancelResponse> map = redisson.getMap(cancelResponseMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RMap<String, RemoteServiceCancelResponse> map = getMap(cancelResponseMapName);
map.fastPutAsync(request.getId(), response);
map.expireAsync(60, TimeUnit.SECONDS);
}
......@@ -371,7 +372,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
timeout = request.getOptions().getExecutionTimeoutInMillis();
}
RBlockingQueueAsync<RRemoteServiceResponse> queue = redisson.getBlockingQueue(responseName, codec);
RBlockingQueueAsync<RRemoteServiceResponse> queue = getBlockingQueue(responseName, codec);
RFuture<Void> clientsFuture = queue.putAsync(responseHolder.get());
queue.expireAsync(timeout, TimeUnit.MILLISECONDS);
......
......@@ -15,6 +15,9 @@
*/
package org.redisson;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.redisson.api.BatchOptions;
import org.redisson.api.ClusterNode;
import org.redisson.api.MapOptions;
......@@ -43,6 +46,7 @@ import org.redisson.api.RPermitExpirableSemaphoreRx;
import org.redisson.api.RQueueRx;
import org.redisson.api.RRateLimiterRx;
import org.redisson.api.RReadWriteLockRx;
import org.redisson.api.RRemoteService;
import org.redisson.api.RScoredSortedSetRx;
import org.redisson.api.RScriptRx;
import org.redisson.api.RSemaphoreRx;
......@@ -61,6 +65,7 @@ import org.redisson.config.Config;
import org.redisson.config.ConfigSupport;
import org.redisson.connection.ConnectionManager;
import org.redisson.eviction.EvictionScheduler;
import org.redisson.remote.ResponseEntry;
import org.redisson.rx.CommandRxExecutor;
import org.redisson.rx.CommandRxService;
import org.redisson.rx.RedissonBatchRx;
......@@ -96,6 +101,8 @@ public class RedissonRx implements RedissonRxClient {
protected final ConnectionManager connectionManager;
protected final Config config;
protected final ConcurrentMap<String, ResponseEntry> responses = new ConcurrentHashMap<>();
protected RedissonRx(Config config) {
this.config = config;
Config configCopy = new Config(config);
......@@ -376,6 +383,32 @@ public class RedissonRx implements RedissonRxClient {
public RAtomicDoubleRx getAtomicDouble(String name) {
return RxProxyBuilder.create(commandExecutor, new RedissonAtomicDouble(commandExecutor, name), RAtomicDoubleRx.class);
}
@Override
public RRemoteService getRemoteService() {
return getRemoteService("redisson_rs", connectionManager.getCodec());
}
@Override
public RRemoteService getRemoteService(String name) {
return getRemoteService(name, connectionManager.getCodec());
}
@Override
public RRemoteService getRemoteService(Codec codec) {
return getRemoteService("redisson_rs", codec);
}
@Override
public RRemoteService getRemoteService(String name, Codec codec) {
String executorId;
if (codec == connectionManager.getCodec()) {
executorId = connectionManager.getId().toString();
} else {
executorId = connectionManager.getId() + ":" + name;
}
return new RedissonRemoteService(codec, name, commandExecutor, executorId, responses);
}
@Override
public RBitSetRx getBitSet(String name) {
......
......@@ -92,6 +92,25 @@ public interface RList<V> extends List<V>, RExpirable, RListAsync<V>, RSortable<
*/
void trim(int fromIndex, int toIndex);
/**
* Returns range of values from 0 index to <code>toIndex</code>. Indexes are zero based.
* <code>-1</code> means the last element, <code>-2</code> means penultimate and so on.
*
* @param toIndex - end index
* @return
*/
List<V> range(int toIndex);
/**
* Returns range of values from <code>fromIndex</code> to <code>toIndex</code> index including.
* Indexes are zero based. <code>-1</code> means the last element, <code>-2</code> means penultimate and so on.
*
* @param fromIndex - start index
* @param toIndex - end index
* @return
*/
List<V> range(int fromIndex, int toIndex);
/**
* Remove object by specified index
*
......
......@@ -166,4 +166,23 @@ public interface RListAsync<V> extends RCollectionAsync<V>, RSortableAsync<List<
*/
RFuture<Boolean> removeAsync(Object element, int count);
/**
* Returns range of values from 0 index to <code>toIndex</code>. Indexes are zero based.
* <code>-1</code> means the last element, <code>-2</code> means penultimate and so on.
*
* @param toIndex - end index
* @return
*/
RFuture<List<V>> rangeAsync(int toIndex);
/**
* Returns range of values from <code>fromIndex</code> to <code>toIndex</code> index including.
* Indexes are zero based. <code>-1</code> means the last element, <code>-2</code> means penultimate and so on.
*
* @param fromIndex - start index
* @param toIndex - end index
* @return
*/
RFuture<List<V>> rangeAsync(int fromIndex, int toIndex);
}
......@@ -162,5 +162,24 @@ public interface RListReactive<V> extends RCollectionReactive<V>, RSortableReact
* @return void
*/
Mono<Void> fastRemove(int index);
/**
* Returns range of values from 0 index to <code>toIndex</code>. Indexes are zero based.
* <code>-1</code> means the last element, <code>-2</code> means penultimate and so on.
*
* @param toIndex - end index
* @return
*/
Mono<List<V>> range(int toIndex);
/**
* Returns range of values from <code>fromIndex</code> to <code>toIndex</code> index including.
* Indexes are zero based. <code>-1</code> means the last element, <code>-2</code> means penultimate and so on.
*
* @param fromIndex - start index
* @param toIndex - end index
* @return
*/
Mono<List<V>> range(int fromIndex, int toIndex);
}
......@@ -164,5 +164,24 @@ public interface RListRx<V> extends RCollectionRx<V>, RSortableRx<List<V>> {
* @return void
*/
Completable fastRemove(int index);
/**
* Returns range of values from 0 index to <code>toIndex</code>. Indexes are zero based.
* <code>-1</code> means the last element, <code>-2</code> means penultimate and so on.
*
* @param toIndex - end index
* @return
*/
Single<List<V>> range(int toIndex);
/**
* Returns range of values from <code>fromIndex</code> to <code>toIndex</code> index including.
* Indexes are zero based. <code>-1</code> means the last element, <code>-2</code> means penultimate and so on.
*
* @param fromIndex - start index
* @param toIndex - end index
* @return
*/
Single<List<V>> range(int fromIndex, int toIndex);
}
......@@ -24,6 +24,14 @@ import java.util.concurrent.TimeUnit;
*/
public enum RateIntervalUnit {
MILLISECONDS {
@Override
public long toMillis(long value) {
return value;
}
},
SECONDS {
@Override
public long toMillis(long value) {
......
......@@ -581,6 +581,40 @@ public interface RedissonReactiveClient {
*/
RAtomicDoubleReactive getAtomicDouble(String name);
/**
* Returns object for remote operations prefixed with the default name (redisson_remote_service)
*
* @return RemoteService object
*/
RRemoteService getRemoteService();
/**
* Returns object for remote operations prefixed with the default name (redisson_remote_service)
* and uses provided codec for method arguments and result.
*
* @param codec - codec for response and request
* @return RemoteService object
*/
RRemoteService getRemoteService(Codec codec);
/**
* Returns object for remote operations prefixed with the specified name
*
* @param name - the name used as the Redis key prefix for the services
* @return RemoteService object
*/
RRemoteService getRemoteService(String name);
/**
* Returns object for remote operations prefixed with the specified name
* and uses provided codec for method arguments and result.
*
* @param name - the name used as the Redis key prefix for the services
* @param codec - codec for response and request
* @return RemoteService object
*/
RRemoteService getRemoteService(String name, Codec codec);
/**
* Returns bitSet instance by name.
*
......
......@@ -569,6 +569,40 @@ public interface RedissonRxClient {
*/
RAtomicDoubleRx getAtomicDouble(String name);
/**
* Returns object for remote operations prefixed with the default name (redisson_remote_service)
*
* @return RemoteService object
*/
RRemoteService getRemoteService();
/**
* Returns object for remote operations prefixed with the default name (redisson_remote_service)
* and uses provided codec for method arguments and result.
*
* @param codec - codec for response and request
* @return RemoteService object
*/
RRemoteService getRemoteService(Codec codec);
/**
* Returns object for remote operations prefixed with the specified name
*
* @param name - the name used as the Redis key prefix for the services
* @return RemoteService object
*/
RRemoteService getRemoteService(String name);
/**
* Returns object for remote operations prefixed with the specified name
* and uses provided codec for method arguments and result.
*
* @param name - the name used as the Redis key prefix for the services
* @param codec - codec for response and request
* @return RemoteService object
*/
RRemoteService getRemoteService(String name, Codec codec);
/**
* Returns bitSet instance by name.
*
......
......@@ -25,11 +25,13 @@ import java.lang.annotation.Target;
* client interface for remote service interface.
* <p>
* All method signatures must match with remote service interface,
* but return type must be <code>io.netty.util.concurrent.Future</code>.
* but return type must be <code>org.redisson.api.RFuture</code>.
* <p>
* It's not necessary to add all methods from remote service.
* Add only those which are needed.
*
* @see org.redisson.api.RFuture
*
* @author Nikita Koksharov
*
*/
......
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Annotation used to mark interface as Reactive
* client interface for remote service interface.
* <p>
* All method signatures must match with remote service interface,
* but return type must be <code>reactor.core.publisher.Mono</code>.
* <p>
* It's not necessary to add all methods from remote service.
* Add only those which are needed.
*
* @see reactor.core.publisher.Mono
*
* @author Nikita Koksharov
*
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RRemoteReactive {
/**
* Remote interface class used to register
*
* @return class used to register
*/
Class<?> value();
}
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Annotation used to mark interface as RxJava2
* client interface for remote service interface.
* <p>
* All method signatures must match with remote service interface,
* but return type must be one of the following:
* <ul>
* <li>io.reactivex.Completable</li>
* <li>io.reactivex.Single</li>
* <li>io.reactivex.Maybe</li>
* </ul>
* <p>
* It's not necessary to add all methods from remote service.
* Add only those which are needed.
*
* @see io.reactivex.Completable
* @see io.reactivex.Single
* @see io.reactivex.Maybe
*
* @author Nikita Koksharov
*
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RRemoteRx {
/**
* Remote interface class used to register
*
* @return class used to register
*/
Class<?> value();
}
......@@ -23,13 +23,16 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.redisson.RedissonListMultimapCache;
import org.redisson.RedissonObject;
import org.redisson.RedissonScoredSortedSet;
import org.redisson.RedissonTopic;
import org.redisson.RedissonLocalCachedMap.CacheValue;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.LocalCachedMapOptions.EvictionPolicy;
import org.redisson.api.LocalCachedMapOptions.ReconnectionStrategy;
import org.redisson.api.LocalCachedMapOptions.SyncStrategy;
import org.redisson.api.RFuture;
......@@ -70,7 +73,7 @@ public abstract class LocalCacheListener {
private CommandAsyncExecutor commandExecutor;
private Cache<?, ?> cache;
private RObject object;
private byte[] instanceId;
private byte[] instanceId = new byte[16];
private Codec codec;
private LocalCachedMapOptions<?, ?> options;
......@@ -80,24 +83,48 @@ public abstract class LocalCacheListener {
private int syncListenerId;
private int reconnectionListenerId;
public LocalCacheListener(String name, CommandAsyncExecutor commandExecutor, Cache<?, ?> cache,
RObject object, byte[] instanceId, Codec codec, LocalCachedMapOptions<?, ?> options, long cacheUpdateLogTime) {
public LocalCacheListener(String name, CommandAsyncExecutor commandExecutor,
RObject object, Codec codec, LocalCachedMapOptions<?, ?> options, long cacheUpdateLogTime) {
super();
this.name = name;
this.commandExecutor = commandExecutor;
this.cache = cache;
this.object = object;
this.instanceId = instanceId;
this.codec = codec;
this.options = options;
this.cacheUpdateLogTime = cacheUpdateLogTime;
}
public byte[] generateId() {
ThreadLocalRandom.current().nextBytes(instanceId);
return instanceId;
}
public Cache<CacheKey, CacheValue> createCache(LocalCachedMapOptions<?, ?> options) {
if (options.getEvictionPolicy() == EvictionPolicy.NONE) {
return new NoneCacheMap<CacheKey, CacheValue>(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.LRU) {
return new LRUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.LFU) {
return new LFUCacheMap<CacheKey, CacheValue>(options.getCacheSize(), options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.SOFT) {
return ReferenceCacheMap.soft(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
if (options.getEvictionPolicy() == EvictionPolicy.WEAK) {
return ReferenceCacheMap.weak(options.getTimeToLiveInMillis(), options.getMaxIdleInMillis());
}
throw new IllegalArgumentException("Invalid eviction policy: " + options.getEvictionPolicy());
}
public boolean isDisabled(Object key) {
return disabledKeys.containsKey(key);
}
public void add() {
public void add(Cache<?, ?> cache) {
this.cache = cache;
invalidationTopic = new RedissonTopic(LocalCachedMessageCodec.INSTANCE, commandExecutor, getInvalidationTopicName());
if (options.getReconnectionStrategy() != ReconnectionStrategy.NONE) {
......
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client;
/**
* This error occurs when Redis requires authentication.
*
* @author Nikita Koksharov
*
*/
public class RedisAuthRequiredException extends RedisException {
private static final long serialVersionUID = -2565335188503354660L;
public RedisAuthRequiredException(String message) {
super(message);
}
}
......@@ -38,6 +38,7 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import org.redisson.client.RedisAskException;
import org.redisson.client.RedisAuthRequiredException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisLoadingException;
import org.redisson.client.RedisMovedException;
......@@ -346,6 +347,9 @@ public class CommandDecoder extends ReplayingDecoder<State> {
} else if (error.contains("-OOM ")) {
data.tryFailure(new RedisOutOfMemoryException(error.split("-OOM ")[1]
+ ". channel: " + channel + " data: " + data));
} else if (error.startsWith("NOAUTH")) {
data.tryFailure(new RedisAuthRequiredException(error
+ ". channel: " + channel + " data: " + data));
} else {
if (data != null) {
data.tryFailure(new RedisException(error + ". channel: " + channel + " command: " + LogHelper.toString(data)));
......
......@@ -376,7 +376,7 @@ public class ClusterConnectionManager extends MasterSlaveConnectionManager {
for (ClusterNodeInfo clusterNodeInfo : nodes) {
nodesValue.append(clusterNodeInfo.getNodeInfo()).append("\n");
}
log.debug("cluster nodes state from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
log.debug("cluster nodes state got from {}:\n{}", connection.getRedisClient().getAddr(), nodesValue);
}
Collection<ClusterPartition> newPartitions = parsePartitions(nodes);
......
......@@ -51,7 +51,7 @@ public class CompositeCodec implements Codec {
@Override
public Decoder<Object> getMapValueDecoder() {
return mapValueCodec.getMapKeyDecoder();
return mapValueCodec.getMapValueDecoder();
}
@Override
......
......@@ -29,7 +29,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
......@@ -88,6 +87,7 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.FutureListener;
/**
*
......@@ -697,7 +697,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (details.getConnectionFuture().cancel(false)) {
if (details.getException() == null) {
details.setException(new RedisTimeoutException("Unable to get connection! Try to increase 'nettyThreads' and 'connection pool' settings or set decodeInExecutor = true and increase 'threads' setting"
details.setException(new RedisTimeoutException("Unable to get connection! Try to increase 'nettyThreads' and/or connection pool size settings"
+ "Node source: " + source
+ ", command: " + LogHelper.toString(command, details.getParams())
+ " after " + details.getAttempt() + " retry attempts"));
......@@ -708,7 +708,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) {
if (details.getWriteFuture() != null && details.getWriteFuture().cancel(false)) {
if (details.getException() == null) {
details.setException(new RedisTimeoutException("Unable to send command! "
details.setException(new RedisTimeoutException("Unable to send command! Try to increase 'nettyThreads' and/or connection pool size settings "
+ "Node source: " + source + ", connection: " + details.getConnectionFuture().getNow()
+ ", current command in queue: " + details.getConnectionFuture().getNow().getCurrentCommand()
+ ", command: " + LogHelper.toString(command, details.getParams())
......@@ -740,7 +740,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
if (details.getAttempt() == connectionManager.getConfig().getRetryAttempts()) {
if (details.getException() == null) {
details.setException(new RedisTimeoutException("Unable to send command! Node source: " + source
details.setException(new RedisTimeoutException("Unable to send command! Try to increase 'nettyThreads' and/or connection pool size settings. Node source: " + source
+ ", command: " + LogHelper.toString(command, details.getParams())
+ " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts"));
}
......@@ -941,15 +941,8 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
private <R, V> void handleBlockingOperations(AsyncDetails<V, R> details, RedisConnection connection, Long popTimeout) {
AtomicBoolean skip = new AtomicBoolean();
BiConsumer<Boolean, Throwable> listener = new BiConsumer<Boolean, Throwable>() {
@Override
public void accept(Boolean t, Throwable u) {
if (skip.get()) {
return;
}
details.getMainPromise().tryFailure(new RedissonShutdownException("Redisson is shutdown"));
}
FutureListener<Void> listener = f -> {
details.getMainPromise().tryFailure(new RedissonShutdownException("Redisson is shutdown"));
};
Timeout scheduledFuture;
......@@ -973,7 +966,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
}
synchronized (listener) {
skip.set(true);
connectionManager.getShutdownPromise().removeListener(listener);
}
// handling cancel operation for blocking commands
......@@ -992,7 +985,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
synchronized (listener) {
if (!details.getMainPromise().isDone()) {
connectionManager.getShutdownPromise().onComplete(listener);
connectionManager.getShutdownPromise().addListener(listener);
}
}
}
......
......@@ -37,6 +37,7 @@ import org.redisson.pubsub.PublishSubscribeService;
import io.netty.channel.EventLoopGroup;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
/**
*
......@@ -109,6 +110,6 @@ public interface ConnectionManager {
InfinitySemaphoreLatch getShutdownLatch();
RFuture<Boolean> getShutdownPromise();
Future<Void> getShutdownPromise();
}
......@@ -73,6 +73,9 @@ import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
/**
......@@ -132,7 +135,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
private final AtomicReferenceArray<MasterSlaveEntry> slot2entry = new AtomicReferenceArray<>(MAX_SLOT);
private final Map<RedisClient, MasterSlaveEntry> client2entry = new ConcurrentHashMap<>();
private final RPromise<Boolean> shutdownPromise;
private final Promise<Void> shutdownPromise = ImmediateEventExecutor.INSTANCE.newPromise();
private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch();
......@@ -217,7 +220,6 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
this.cfg = cfg;
this.codec = cfg.getCodec();
this.shutdownPromise = new RedissonPromise<Boolean>();
this.commandExecutor = new CommandSyncService(this);
}
......@@ -651,7 +653,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
timer.stop();
shutdownLatch.close();
shutdownPromise.trySuccess(true);
shutdownPromise.trySuccess(null);
shutdownLatch.awaitUninterruptibly();
if (cfg.getEventLoopGroup() == null) {
......@@ -691,7 +693,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {
}
@Override
public RFuture<Boolean> getShutdownPromise() {
public Future<Void> getShutdownPromise() {
return shutdownPromise;
}
......
......@@ -33,10 +33,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.redisson.api.NodeType;
import org.redisson.api.RFuture;
import org.redisson.client.RedisAuthRequiredException;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisClientConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.codec.StringCodec;
......@@ -55,6 +58,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.resolver.AddressResolver;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.ScheduledFuture;
......@@ -68,13 +72,16 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
private final Logger log = LoggerFactory.getLogger(getClass());
private final ConcurrentMap<String, RedisClient> sentinels = new ConcurrentHashMap<>();
private final Set<URI> sentinelHosts = new HashSet<>();
private final ConcurrentMap<URI, RedisClient> sentinels = new ConcurrentHashMap<>();
private final AtomicReference<String> currentMaster = new AtomicReference<>();
private final Set<URI> disconnectedSlaves = new HashSet<>();
private ScheduledFuture<?> monitorFuture;
private AddressResolver<InetSocketAddress> sentinelResolver;
private boolean usePassword = false;
public SentinelConnectionManager(SentinelServersConfig cfg, Config config, UUID id) {
super(config, id);
......@@ -91,6 +98,26 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
this.sentinelResolver = resolverGroup.getResolver(getGroup().next());
for (URI addr : cfg.getSentinelAddresses()) {
RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts(), null);
try {
RedisConnection c = client.connect();
try {
c.sync(RedisCommands.PING);
} catch (RedisAuthRequiredException e) {
usePassword = true;
}
client.shutdown();
break;
} catch (Exception e) {
// skip
}
}
for (URI addr : cfg.getSentinelAddresses()) {
if (NetUtil.createByteArrayFromIpAddressString(addr.getHost()) == null && !addr.getHost().equals("localhost")) {
sentinelHosts.add(convert(addr.getHost(), "" + addr.getPort()));
}
RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getRetryInterval() * this.config.getRetryAttempts(), null);
try {
RedisConnection connection = client.connect();
......@@ -99,6 +126,10 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
List<String> master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());
if (master.isEmpty()) {
throw new RedisConnectionException("Master node is undefined! SENTINEL GET-MASTER-ADDR-BY-NAME command returns empty result!");
}
String masterHost = createAddress(master.get(0), master.get(1));
this.config.setMasterAddress(masterHost);
currentMaster.set(masterHost);
......@@ -137,13 +168,15 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
String ip = map.get("ip");
String port = map.get("port");
String host = createAddress(ip, port);
log.info("sentinel: {} added", host);
URI sentinelAddr = URIBuilder.create(host);
URI sentinelAddr = convert(ip, port);
RFuture<Void> future = registerSentinel(sentinelAddr, this.config);
connectionFutures.add(future);
}
URI currentAddr = convert(client.getAddr().getAddress().getHostAddress(), "" + client.getAddr().getPort());
RFuture<Void> f = registerSentinel(currentAddr, this.config);
connectionFutures.add(f);
for (RFuture<Void> future : connectionFutures) {
future.awaitUninterruptibly(this.config.getConnectTimeout());
}
......@@ -176,20 +209,28 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
@Override
protected void startDNSMonitoring(RedisClient masterHost) {
if (config.getDnsMonitoringInterval() == -1) {
if (config.getDnsMonitoringInterval() == -1 || sentinelHosts.isEmpty()) {
return;
}
scheduleSentinelDNSCheck();
}
@Override
protected RedisClientConfig createRedisConfig(NodeType type, URI address, int timeout, int commandTimeout,
String sslHostname) {
RedisClientConfig result = super.createRedisConfig(type, address, timeout, commandTimeout, sslHostname);
if (type == NodeType.SENTINEL && !usePassword) {
result.setPassword(null);
}
return result;
}
private void scheduleSentinelDNSCheck() {
monitorFuture = group.schedule(new Runnable() {
@Override
public void run() {
List<RedisClient> sentinels = new ArrayList<>(SentinelConnectionManager.this.sentinels.values());
AtomicInteger sentinelsCounter = new AtomicInteger(sentinels.size());
AtomicInteger sentinelsCounter = new AtomicInteger(sentinelHosts.size());
FutureListener<List<InetSocketAddress>> commonListener = new FutureListener<List<InetSocketAddress>>() {
@Override
public void operationComplete(Future<List<InetSocketAddress>> future) throws Exception {
......@@ -199,42 +240,24 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
};
for (RedisClient client : sentinels) {
Future<List<InetSocketAddress>> allNodes = sentinelResolver.resolveAll(InetSocketAddress.createUnresolved(client.getAddr().getHostName(), client.getAddr().getPort()));
for (URI host : sentinelHosts) {
Future<List<InetSocketAddress>> allNodes = sentinelResolver.resolveAll(InetSocketAddress.createUnresolved(host.getHost(), host.getPort()));
allNodes.addListener(new FutureListener<List<InetSocketAddress>>() {
@Override
public void operationComplete(Future<List<InetSocketAddress>> future) throws Exception {
if (!future.isSuccess()) {
log.error("Unable to resolve " + client.getAddr().getHostName(), future.cause());
log.error("Unable to resolve " + host.getHost(), future.cause());
return;
}
boolean clientFound = false;
for (InetSocketAddress addr : future.getNow()) {
boolean found = false;
for (RedisClient currentSentinel : SentinelConnectionManager.this.sentinels.values()) {
if (currentSentinel.getAddr().getAddress().getHostAddress().equals(addr.getAddress().getHostAddress())
&& currentSentinel.getAddr().getPort() == addr.getPort()) {
found = true;
break;
}
}
if (!found) {
URI uri = convert(addr.getAddress().getHostAddress(), "" + addr.getPort());
Set<URI> newUris = future.getNow().stream()
.map(addr -> convert(addr.getAddress().getHostAddress(), "" + addr.getPort()))
.collect(Collectors.toSet());
for (URI uri : newUris) {
if (!sentinels.containsKey(uri)) {
registerSentinel(uri, getConfig());
}
if (client.getAddr().getAddress().getHostAddress().equals(addr.getAddress().getHostAddress())
&& client.getAddr().getPort() == addr.getPort()) {
clientFound = true;
}
}
if (!clientFound) {
String addr = client.getAddr().getAddress().getHostAddress() + ":" + client.getAddr().getPort();
RedisClient sentinel = SentinelConnectionManager.this.sentinels.remove(addr);
if (sentinel != null) {
sentinel.shutdownAsync();
log.warn("sentinel: {} has down", addr);
}
}
}
});
......@@ -405,21 +428,46 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return;
}
for (Map<String, String> map : list) {
if (map.isEmpty()) {
continue;
Set<URI> newUris = list.stream().filter(m -> {
String flags = m.get("flags");
if (!m.isEmpty() && !flags.contains("disconnected") && !flags.contains("s_down")) {
return true;
}
String ip = map.get("ip");
String port = map.get("port");
URI sentinelAddr = convert(ip, port);
registerSentinel(sentinelAddr, getConfig());
}
return false;
}).map(m -> {
String ip = m.get("ip");
String port = m.get("port");
return convert(ip, port);
}).collect(Collectors.toSet());
InetSocketAddress addr = connection.getRedisClient().getAddr();
URI currentAddr = convert(addr.getAddress().getHostAddress(), "" + addr.getPort());
newUris.add(currentAddr);
updateSentinels(newUris);
});
sentinelsFuture.onComplete(commonListener);
}
private void updateSentinels(Set<URI> newUris) {
Set<URI> currentUris = new HashSet<>(SentinelConnectionManager.this.sentinels.keySet());
Set<URI> addedUris = new HashSet<>(newUris);
addedUris.removeAll(currentUris);
for (URI uri : addedUris) {
registerSentinel(uri, getConfig());
}
currentUris.removeAll(newUris);
for (URI uri : currentUris) {
RedisClient sentinel = SentinelConnectionManager.this.sentinels.remove(uri);
if (sentinel != null) {
sentinel.shutdownAsync();
log.warn("sentinel: {} has down", uri);
}
}
}
private String createAddress(String host, Object port) {
if (host.contains(":") && !host.startsWith("[")) {
host = "[" + host + "]";
......@@ -438,8 +486,7 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
}
private RFuture<Void> registerSentinel(URI addr, MasterSlaveServersConfig c) {
String key = addr.getHost() + ":" + addr.getPort();
RedisClient sentinel = sentinels.get(key);
RedisClient sentinel = sentinels.get(addr);
if (sentinel != null) {
return RedissonPromise.newSucceededFuture(null);
}
......@@ -453,7 +500,8 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {
return;
}
sentinels.putIfAbsent(key, client);
sentinels.putIfAbsent(addr, client);
log.info("sentinel: {} added", addr);
result.trySuccess(null);
});
return result;
......
......@@ -20,9 +20,8 @@ import java.util.concurrent.ConcurrentMap;
import org.redisson.RedissonRemoteService;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandExecutor;
import org.redisson.command.CommandAsyncService;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.ResponseEntry;
......@@ -33,9 +32,9 @@ import org.redisson.remote.ResponseEntry;
*/
public class RedissonExecutorRemoteService extends RedissonRemoteService {
public RedissonExecutorRemoteService(Codec codec, RedissonClient redisson, String name,
CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);
public RedissonExecutorRemoteService(Codec codec, String name,
CommandAsyncService commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, executorId, responses);
}
@Override
......
......@@ -21,7 +21,6 @@ import java.util.concurrent.ThreadLocalRandom;
import org.redisson.RedissonExecutorService;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
......@@ -41,8 +40,8 @@ public class ScheduledTasksService extends TasksService {
private RequestId requestId;
public ScheduledTasksService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String redissonId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, redissonId, responses);
public ScheduledTasksService(Codec codec, String name, CommandExecutor commandExecutor, String redissonId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, redissonId, responses);
}
public void setRequestId(RequestId requestId) {
......
......@@ -19,7 +19,6 @@ import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
......@@ -35,8 +34,8 @@ public class TasksBatchService extends TasksService {
private CommandBatchService batchCommandService;
public TasksBatchService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);
public TasksBatchService(Codec codec, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, executorId, responses);
batchCommandService = new CommandBatchService(commandExecutor.getConnectionManager());
}
......
......@@ -161,7 +161,7 @@ public class TasksRunnerService implements RemoteExecutorService {
* @return
*/
private RemoteExecutorServiceAsync asyncScheduledServiceAtFixed(String executorId, String requestId) {
ScheduledTasksService scheduledRemoteService = new ScheduledTasksService(codec, redisson, name, commandExecutor, executorId, responses);
ScheduledTasksService scheduledRemoteService = new ScheduledTasksService(codec, name, commandExecutor, executorId, responses);
scheduledRemoteService.setTerminationTopicName(terminationTopicName);
scheduledRemoteService.setTasksCounterName(tasksCounterName);
scheduledRemoteService.setStatusName(statusName);
......
......@@ -22,12 +22,10 @@ import java.util.concurrent.TimeUnit;
import org.redisson.RedissonExecutorService;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.params.TaskParameters;
import org.redisson.misc.RPromise;
......@@ -55,8 +53,8 @@ public class TasksService extends BaseRemoteService {
protected String tasksRetryIntervalName;
protected long tasksRetryInterval;
public TasksService(Codec codec, RedissonClient redisson, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);
public TasksService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, name, commandExecutor, executorId, responses);
}
public void setTasksRetryIntervalName(String tasksRetryIntervalName) {
......@@ -194,7 +192,7 @@ public class TasksService extends BaseRemoteService {
return;
}
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RMap<String, RemoteServiceCancelRequest> canceledRequests = getMap(cancelRequestMapName);
canceledRequests.putAsync(requestId.toString(), new RemoteServiceCancelRequest(true, true));
canceledRequests.expireAsync(60, TimeUnit.SECONDS);
......
......@@ -46,16 +46,17 @@ public class CommandReactiveService extends CommandAsyncService implements Comma
emitter.error(e);
return;
}
emitter.onDispose(() -> {
future.cancel(true);
});
future.onComplete((v, e) -> {
if (e != null) {
emitter.error(e);
return;
}
emitter.onDispose(() -> {
future.cancel(true);
});
if (v != null) {
emitter.next(v);
}
......
......@@ -19,14 +19,17 @@ import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.redisson.RedissonBucket;
import org.redisson.RedissonList;
import org.redisson.RedissonMap;
import org.redisson.api.RFuture;
import org.redisson.api.RList;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
......@@ -48,13 +51,40 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
protected final String cancelRequestMapName;
public AsyncRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName,
ConcurrentMap<String, ResponseEntry> responses, RedissonClient redisson, Codec codec, String executorId, String cancelRequestMapName, BaseRemoteService remoteService) {
super(commandExecutor, name, responseQueueName, responses, redisson, codec, executorId, remoteService);
ConcurrentMap<String, ResponseEntry> responses, Codec codec, String executorId, String cancelRequestMapName, BaseRemoteService remoteService) {
super(commandExecutor, name, responseQueueName, responses, codec, executorId, remoteService);
this.cancelRequestMapName = cancelRequestMapName;
}
protected List<Class<?>> permittedClasses() {
return Arrays.asList(RFuture.class);
}
public <T> T create(Class<T> remoteInterface, RemoteInvocationOptions options,
Class<?> syncInterface) {
for (Method m : remoteInterface.getMethods()) {
try {
syncInterface.getMethod(m.getName(), m.getParameterTypes());
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Method '" + m.getName() + "' with params '"
+ Arrays.toString(m.getParameterTypes()) + "' isn't defined in " + syncInterface);
} catch (SecurityException e) {
throw new IllegalArgumentException(e);
}
boolean permitted = false;
for (Class<?> clazz : permittedClasses()) {
if (clazz.isAssignableFrom(m.getReturnType())) {
permitted = true;
break;
}
}
if (!permitted) {
throw new IllegalArgumentException(
m.getReturnType().getClass() + " isn't allowed as return type");
}
}
// local copy of the options, to prevent mutation
RemoteInvocationOptions optionsCopy = new RemoteInvocationOptions(options);
InvocationHandler handler = new InvocationHandler() {
......@@ -165,16 +195,20 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
}
});
return result;
return convertResult(result, method.getReturnType());
}
};
return (T) Proxy.newProxyInstance(remoteInterface.getClassLoader(), new Class[] { remoteInterface }, handler);
}
protected Object convertResult(RemotePromise<Object> result, Class<?> returnType) {
return result;
}
private void awaitResultAsync(RemoteInvocationOptions optionsCopy, RemotePromise<Object> result,
String ackName, RFuture<RRemoteServiceResponse> responseFuture) {
RFuture<Boolean> deleteFuture = redisson.getBucket(ackName).deleteAsync();
RFuture<Boolean> deleteFuture = new RedissonBucket<>(commandExecutor, ackName).deleteAsync();
deleteFuture.onComplete((res, e) -> {
if (e != null) {
result.tryFailure(e);
......@@ -252,7 +286,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
boolean ackNotSent = commandExecutor.get(future);
if (ackNotSent) {
RList<Object> list = redisson.getList(requestQueueName, LongCodec.INSTANCE);
RList<Object> list = new RedissonList<>(LongCodec.INSTANCE, commandExecutor, requestQueueName, null);
list.remove(requestId.toString());
super.cancel(mayInterruptIfRunning);
return true;
......@@ -294,7 +328,7 @@ public class AsyncRemoteProxy extends BaseRemoteProxy {
private void cancelExecution(RemoteInvocationOptions optionsCopy,
boolean mayInterruptIfRunning, RemotePromise<Object> remotePromise) {
RMap<String, RemoteServiceCancelRequest> canceledRequests = redisson.getMap(cancelRequestMapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RMap<String, RemoteServiceCancelRequest> canceledRequests = new RedissonMap<>(new CompositeCodec(StringCodec.INSTANCE, codec, codec), commandExecutor, cancelRequestMapName, null, null, null);
canceledRequests.fastPutAsync(remotePromise.getRequestId().toString(), new RemoteServiceCancelRequest(mayInterruptIfRunning, false));
canceledRequests.expireAsync(60, TimeUnit.SECONDS);
......
......@@ -25,9 +25,9 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.redisson.RedissonBlockingQueue;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
......@@ -54,19 +54,17 @@ public abstract class BaseRemoteProxy {
private final String name;
final String responseQueueName;
private final ConcurrentMap<String, ResponseEntry> responses;
final RedissonClient redisson;
final Codec codec;
final String executorId;
final BaseRemoteService remoteService;
BaseRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName,
ConcurrentMap<String, ResponseEntry> responses, RedissonClient redisson, Codec codec, String executorId, BaseRemoteService remoteService) {
ConcurrentMap<String, ResponseEntry> responses, Codec codec, String executorId, BaseRemoteService remoteService) {
super();
this.commandExecutor = commandExecutor;
this.name = name;
this.responseQueueName = responseQueueName;
this.responses = responses;
this.redisson = redisson;
this.codec = codec;
this.executorId = executorId;
this.remoteService = remoteService;
......@@ -203,12 +201,16 @@ public abstract class BaseRemoteProxy {
return responseFuture;
}
private <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueue<V>(codec, commandExecutor, name, null);
}
private void pollResponse(ResponseEntry ent) {
if (!ent.getStarted().compareAndSet(false, true)) {
return;
}
RBlockingQueue<RRemoteServiceResponse> queue = redisson.getBlockingQueue(responseQueueName, codec);
RBlockingQueue<RRemoteServiceResponse> queue = getBlockingQueue(responseQueueName, codec);
RFuture<RRemoteServiceResponse> future = queue.takeAsync();
future.onComplete(createResponseListener());
}
......@@ -230,7 +232,7 @@ public abstract class BaseRemoteProxy {
RequestId key = new RequestId(response.getId());
List<Result> list = entry.getResponses().get(key);
if (list == null) {
RBlockingQueue<RRemoteServiceResponse> responseQueue = redisson.getBlockingQueue(responseQueueName, codec);
RBlockingQueue<RRemoteServiceResponse> responseQueue = getBlockingQueue(responseQueueName, codec);
responseQueue.takeAsync().onComplete(createResponseListener());
return;
}
......@@ -246,7 +248,7 @@ public abstract class BaseRemoteProxy {
if (entry.getResponses().isEmpty()) {
responses.remove(responseQueueName, entry);
} else {
RBlockingQueue<RRemoteServiceResponse> responseQueue = redisson.getBlockingQueue(responseQueueName, codec);
RBlockingQueue<RRemoteServiceResponse> responseQueue = getBlockingQueue(responseQueueName, codec);
responseQueue.takeAsync().onComplete(createResponseListener());
}
}
......
......@@ -26,11 +26,13 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.redisson.RedissonMap;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.annotation.RRemoteAsync;
import org.redisson.api.annotation.RRemoteReactive;
import org.redisson.api.annotation.RRemoteRx;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.codec.CompositeCodec;
......@@ -56,7 +58,6 @@ public abstract class BaseRemoteService {
private final ConcurrentMap<Method, long[]> methodSignaturesCache = new ConcurrentHashMap<>();
protected final Codec codec;
protected final RedissonClient redisson;
protected final String name;
protected final CommandAsyncExecutor commandExecutor;
protected final String executorId;
......@@ -66,9 +67,8 @@ public abstract class BaseRemoteService {
protected final String responseQueueName;
private final ConcurrentMap<String, ResponseEntry> responses;
public BaseRemoteService(Codec codec, RedissonClient redisson, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
public BaseRemoteService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
this.codec = codec;
this.redisson = redisson;
this.name = name;
this.commandExecutor = commandExecutor;
this.executorId = executorId;
......@@ -127,27 +127,24 @@ public abstract class BaseRemoteService {
for (Annotation annotation : remoteInterface.getAnnotations()) {
if (annotation.annotationType() == RRemoteAsync.class) {
Class<T> syncInterface = (Class<T>) ((RRemoteAsync) annotation).value();
for (Method m : remoteInterface.getMethods()) {
try {
syncInterface.getMethod(m.getName(), m.getParameterTypes());
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Method '" + m.getName() + "' with params '"
+ Arrays.toString(m.getParameterTypes()) + "' isn't defined in " + syncInterface);
} catch (SecurityException e) {
throw new IllegalArgumentException(e);
}
if (!m.getReturnType().getClass().isInstance(RFuture.class)) {
throw new IllegalArgumentException(
m.getReturnType().getClass() + " isn't allowed as return type");
}
}
AsyncRemoteProxy proxy = new AsyncRemoteProxy(commandExecutor, name, responseQueueName, responses, redisson, codec, executorId, cancelRequestMapName, this);
AsyncRemoteProxy proxy = new AsyncRemoteProxy(commandExecutor, name, responseQueueName, responses, codec, executorId, cancelRequestMapName, this);
return proxy.create(remoteInterface, options, syncInterface);
}
if (annotation.annotationType() == RRemoteReactive.class) {
Class<T> syncInterface = (Class<T>) ((RRemoteReactive) annotation).value();
ReactiveRemoteProxy proxy = new ReactiveRemoteProxy(commandExecutor, name, responseQueueName, responses, codec, executorId, cancelRequestMapName, this);
return proxy.create(remoteInterface, options, syncInterface);
}
if (annotation.annotationType() == RRemoteRx.class) {
Class<T> syncInterface = (Class<T>) ((RRemoteRx) annotation).value();
RxRemoteProxy proxy = new RxRemoteProxy(commandExecutor, name, responseQueueName, responses, codec, executorId, cancelRequestMapName, this);
return proxy.create(remoteInterface, options, syncInterface);
}
}
SyncRemoteProxy proxy = new SyncRemoteProxy(commandExecutor, name, responseQueueName, responses, redisson, codec, executorId, this);
SyncRemoteProxy proxy = new SyncRemoteProxy(commandExecutor, name, responseQueueName, responses, codec, executorId, this);
return proxy.create(remoteInterface, options);
}
......@@ -155,6 +152,10 @@ public abstract class BaseRemoteService {
return executionTimeoutInMillis;
}
protected <K, V> RMap<K, V> getMap(String name) {
return new RedissonMap<>(new CompositeCodec(StringCodec.INSTANCE, codec, codec), commandExecutor, name, null, null, null);
}
protected <T> void scheduleCheck(String mapName, RequestId requestId, RPromise<T> cancelRequest) {
commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
......@@ -163,7 +164,7 @@ public abstract class BaseRemoteService {
return;
}
RMap<String, T> canceledRequests = redisson.getMap(mapName, new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RMap<String, T> canceledRequests = getMap(mapName);
RFuture<T> future = canceledRequests.removeAsync(requestId.toString());
future.onComplete((request, ex) -> {
if (cancelRequest.isDone()) {
......
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.remote;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.reactive.CommandReactiveExecutor;
import reactor.core.publisher.Mono;
/**
*
* @author Nikita Koksharov
*
*/
public class ReactiveRemoteProxy extends AsyncRemoteProxy {
public ReactiveRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName,
ConcurrentMap<String, ResponseEntry> responses, Codec codec, String executorId,
String cancelRequestMapName, BaseRemoteService remoteService) {
super(commandExecutor, name, responseQueueName, responses, codec, executorId, cancelRequestMapName,
remoteService);
}
@Override
protected List<Class<?>> permittedClasses() {
return Arrays.asList(Mono.class);
}
@Override
protected Object convertResult(RemotePromise<Object> result, Class<?> returnType) {
return ((CommandReactiveExecutor) commandExecutor).reactive(() -> result);
}
}
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.remote;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.redisson.client.codec.Codec;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.rx.CommandRxExecutor;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Single;
/**
*
* @author Nikita Koksharov
*
*/
public class RxRemoteProxy extends AsyncRemoteProxy {
public RxRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName,
ConcurrentMap<String, ResponseEntry> responses, Codec codec, String executorId,
String cancelRequestMapName, BaseRemoteService remoteService) {
super(commandExecutor, name, responseQueueName, responses, codec, executorId, cancelRequestMapName,
remoteService);
}
@Override
protected List<Class<?>> permittedClasses() {
return Arrays.asList(Completable.class, Single.class, Maybe.class);
}
@Override
protected Object convertResult(RemotePromise<Object> result, Class<?> returnType) {
Flowable<Object> flowable = ((CommandRxExecutor) commandExecutor).flowable(() -> result);
if (returnType == Completable.class) {
return flowable.ignoreElements();
}
if (returnType == Single.class) {
return flowable.singleOrError();
}
return flowable.singleElement();
}
}
......@@ -20,8 +20,8 @@ import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.ConcurrentMap;
import org.redisson.RedissonBucket;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;
......@@ -37,8 +37,8 @@ import org.redisson.misc.RPromise;
public class SyncRemoteProxy extends BaseRemoteProxy {
public SyncRemoteProxy(CommandAsyncExecutor commandExecutor, String name, String responseQueueName,
ConcurrentMap<String, ResponseEntry> responses, RedissonClient redisson, Codec codec, String executorId, BaseRemoteService remoteService) {
super(commandExecutor, name, responseQueueName, responses, redisson, codec, executorId, remoteService);
ConcurrentMap<String, ResponseEntry> responses, Codec codec, String executorId, BaseRemoteService remoteService) {
super(commandExecutor, name, responseQueueName, responses, codec, executorId, remoteService);
}
public <T> T create(Class<T> remoteInterface, RemoteInvocationOptions options) {
......@@ -120,7 +120,7 @@ public class SyncRemoteProxy extends BaseRemoteProxy {
+ optionsCopy.getAckTimeoutInMillis() + "ms for request: " + request);
}
}
redisson.getBucket(ackName).delete();
new RedissonBucket<>(commandExecutor, ackName).delete();
}
// poll for the response only if expected
......
......@@ -50,19 +50,19 @@ public class CommandRxService extends CommandAsyncService implements CommandRxEx
p.onError(e);
return;
}
p.doOnCancel(new Action() {
@Override
public void run() throws Exception {
future.cancel(true);
}
});
future.onComplete((res, e) -> {
if (e != null) {
p.onError(e);
return;
}
p.doOnCancel(new Action() {
@Override
public void run() throws Exception {
future.cancel(true);
}
});
if (res != null) {
p.onNext(res);
}
......
......@@ -235,6 +235,20 @@ public class RedissonListMultimapTest extends BaseTest {
assertThat(map.containsEntry(new SimpleKey("0"), new SimpleValue("2"))).isFalse();
}
@Test
public void testRange() {
RListMultimap<Integer, Integer> map = redisson.getListMultimap("test1");
map.put(1, 1);
map.put(1, 2);
map.put(1, 3);
map.put(1, 4);
map.put(1, 5);
assertThat(map.get(1).range(1)).containsExactly(1, 2);
assertThat(map.get(1).range(1, 3)).containsExactly(2, 3, 4);
}
@Test
public void testRemove() {
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
......
......@@ -34,6 +34,23 @@ public class RedissonListTest extends BaseTest {
assertThat(list.get(1, 2, 3)).containsSequence(2, 3, 4);
}
@Test
public void testRange() {
RList<Integer> list = redisson.getList("list", IntegerCodec.INSTANCE);
list.add(1);
list.add(2);
list.add(3);
list.add(4);
list.add(5);
assertThat(list.range(1)).containsExactly(1, 2);
assertThat(list.range(1, 3)).containsExactly(2, 3, 4);
list.delete();
assertThat(list.range(0, 2)).isEmpty();
}
@Test
public void testSortOrder() {
RList<Integer> list = redisson.getList("list", IntegerCodec.INSTANCE);
......
......@@ -552,6 +552,8 @@ public class RedissonLiveObjectServiceTest extends BaseTest {
@RId(generator = UUIDGenerator.class)
private Serializable id;
private Map<String, String> values = new HashMap<>();
public TestClass() {
}
......@@ -597,6 +599,17 @@ public class RedissonLiveObjectServiceTest extends BaseTest {
return null;
}
public void addEntry(String key, String value) {
values.put(key, value);
}
public void setValues(Map<String, String> values) {
this.values = values;
}
public Map<String, String> getValues() {
return values;
}
@Override
public boolean equals(Object obj) {
if (obj == null || !(obj instanceof TestClass) || !this.getClass().equals(obj.getClass())) {
......@@ -763,9 +776,11 @@ public class RedissonLiveObjectServiceTest extends BaseTest {
TestClass ts = new TestClass(new ObjectId(100));
ts.setValue("VALUE");
ts.setContent(new TestREntity("123"));
ts.addEntry("1", "2");
TestClass persisted = service.persist(ts);
assertEquals(2, redisson.getKeys().count());
assertEquals(3, redisson.getKeys().count());
assertEquals(1, persisted.getValues().size());
assertEquals("123", ((TestREntity)persisted.getContent()).getName());
assertEquals(new ObjectId(100), persisted.getId());
assertEquals("VALUE", persisted.getValue());
......
......@@ -4,12 +4,14 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.redisson.api.RRateLimiter;
import org.redisson.api.RateIntervalUnit;
......@@ -27,6 +29,15 @@ public class RedissonRateLimiterTest extends BaseTest {
assertThat(rr.getConfig().getRateType()).isEqualTo(RateType.OVERALL);
}
@Test
public void testPermitsExceeding() throws InterruptedException {
RRateLimiter limiter = redisson.getRateLimiter("myLimiter");
limiter.trySetRate(RateType.PER_CLIENT, 1, 1, RateIntervalUnit.SECONDS);
Assertions.assertThatThrownBy(() -> limiter.tryAcquire(20)).hasMessageContaining("Requested permits amount could not exceed defined rate");
assertThat(limiter.tryAcquire()).isTrue();
}
@Test(timeout = 1500)
public void testTryAcquire() {
RRateLimiter rr = redisson.getRateLimiter("acquire");
......
......@@ -22,13 +22,22 @@ import org.junit.Test;
import org.redisson.api.RFuture;
import org.redisson.api.RRemoteService;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.RedissonRxClient;
import org.redisson.api.RemoteInvocationOptions;
import org.redisson.api.annotation.RRemoteAsync;
import org.redisson.api.annotation.RRemoteReactive;
import org.redisson.api.annotation.RRemoteRx;
import org.redisson.codec.FstCodec;
import org.redisson.codec.SerializationCodec;
import org.redisson.remote.RemoteServiceAckTimeoutException;
import org.redisson.remote.RemoteServiceTimeoutException;
import io.reactivex.Completable;
import io.reactivex.Single;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
public class RedissonRemoteServiceTest extends BaseTest {
public static class Pojo {
......@@ -87,6 +96,40 @@ public class RedissonRemoteServiceTest extends BaseTest {
RFuture<Void> timeoutMethod();
}
@RRemoteReactive(RemoteInterface.class)
public interface RemoteInterfaceReactive {
Mono<Void> cancelMethod();
Mono<Void> voidMethod(String name, Long param);
Mono<Long> resultMethod(Long value);
Mono<Void> errorMethod();
Mono<Void> errorMethodWithCause();
Mono<Void> timeoutMethod();
}
@RRemoteRx(RemoteInterface.class)
public interface RemoteInterfaceRx {
Completable cancelMethod();
Completable voidMethod(String name, Long param);
Single<Long> resultMethod(Long value);
Completable errorMethod();
Completable errorMethodWithCause();
Completable timeoutMethod();
}
@RRemoteAsync(RemoteInterface.class)
public interface RemoteInterfaceWrongMethodAsync {
......@@ -272,9 +315,33 @@ public class RedissonRemoteServiceTest extends BaseTest {
assertThat(iterations.get()).isLessThan(Integer.MAX_VALUE / 2);
assertThat(executor.awaitTermination(1, TimeUnit.SECONDS)).isTrue();
assertThat(executor.awaitTermination(2, TimeUnit.SECONDS)).isTrue();
}
@Test
public void testCancelReactive() throws InterruptedException {
RedissonReactiveClient r1 = Redisson.createReactive(createConfig());
AtomicInteger iterations = new AtomicInteger();
ExecutorService executor = Executors.newSingleThreadExecutor();
r1.getKeys().flushall();
r1.getRemoteService().register(RemoteInterface.class, new RemoteImpl(iterations), 1, executor);
RedissonReactiveClient r2 = Redisson.createReactive(createConfig());
RemoteInterfaceReactive ri = r2.getRemoteService().get(RemoteInterfaceReactive.class);
Mono<Void> f = ri.cancelMethod();
Disposable t = f.doOnSubscribe(s -> s.request(1)).subscribe();
Thread.sleep(500);
t.dispose();
executor.shutdown();
r1.shutdown();
r2.shutdown();
assertThat(iterations.get()).isLessThan(Integer.MAX_VALUE / 2);
assertThat(executor.awaitTermination(2, TimeUnit.SECONDS)).isTrue();
}
@Test(expected = IllegalArgumentException.class)
public void testWrongMethodAsync() throws InterruptedException {
......@@ -303,6 +370,40 @@ public class RedissonRemoteServiceTest extends BaseTest {
r1.shutdown();
r2.shutdown();
}
@Test
public void testReactive() throws InterruptedException {
RedissonReactiveClient r1 = Redisson.createReactive(createConfig());
r1.getRemoteService().register(RemoteInterface.class, new RemoteImpl());
RedissonReactiveClient r2 = Redisson.createReactive(createConfig());
RemoteInterfaceReactive ri = r2.getRemoteService().get(RemoteInterfaceReactive.class);
Mono<Void> f = ri.voidMethod("someName", 100L);
f.block();
Mono<Long> resFuture = ri.resultMethod(100L);
assertThat(resFuture.block()).isEqualTo(200);
r1.shutdown();
r2.shutdown();
}
@Test
public void testRx() throws InterruptedException {
RedissonRxClient r1 = Redisson.createRx(createConfig());
r1.getRemoteService().register(RemoteInterface.class, new RemoteImpl());
RedissonRxClient r2 = Redisson.createRx(createConfig());
RemoteInterfaceRx ri = r2.getRemoteService().get(RemoteInterfaceRx.class);
Completable f = ri.voidMethod("someName", 100L);
f.blockingGet();
Single<Long> resFuture = ri.resultMethod(100L);
assertThat(resFuture.blockingGet()).isEqualTo(200);
r1.shutdown();
r2.shutdown();
}
@Test
public void testExecutorAsync() throws InterruptedException {
......
......@@ -13,6 +13,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.joor.Reflect;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
......@@ -105,7 +106,8 @@ public class RedissonScheduledExecutorServiceTest extends BaseTest {
assertThat(f.isSuccess()).isTrue();
assertThat(System.currentTimeMillis() - start).isBetween(11000L, 11500L);
Deencapsulation.setField(RedissonExecutorService.class, "RESULT_OPTIONS", RemoteInvocationOptions.defaults().noAck().expectResultWithin(3, TimeUnit.SECONDS));
Reflect.onClass(RedissonExecutorService.class).set("RESULT_OPTIONS", RemoteInvocationOptions.defaults().noAck().expectResultWithin(3, TimeUnit.SECONDS));
executor = redisson.getExecutorService("test", ExecutorOptions.defaults().taskRetryInterval(5, TimeUnit.SECONDS));
start = System.currentTimeMillis();
RScheduledFuture<?> f1 = executor.schedule(new ScheduledCallableTask(), 5, TimeUnit.SECONDS);
......
......@@ -34,7 +34,6 @@ import org.redisson.BaseTest;
import org.redisson.RedisRunner;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.client.codec.JsonJacksonMapCodec;
import org.redisson.codec.TypedJsonJacksonCodec;
import org.redisson.config.Config;
import org.redisson.jcache.configuration.RedissonConfiguration;
......@@ -44,6 +43,38 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
public class JCacheTest extends BaseTest {
@Test
public void testPutAll() throws Exception {
RedisProcess runner = new RedisRunner()
.nosave()
.randomDir()
.port(6311)
.run();
URL configUrl = getClass().getResource("redisson-jcache.json");
Config cfg = Config.fromJSON(configUrl);
Configuration<String, String> config = RedissonConfiguration.fromConfig(cfg);
Cache<String, String> cache = Caching.getCachingProvider().getCacheManager()
.createCache("test", config);
Map<String, String> map = new HashMap<>();
for (int i = 0; i < 10000; i++) {
map.put("" + i, "" + i);
}
long start = System.currentTimeMillis();
cache.putAll(map);
System.out.println(System.currentTimeMillis() - start);
for (int i = 0; i < 10000; i++) {
assertThat(cache.containsKey("" + i)).isTrue();
}
cache.close();
runner.stop();
}
@Test
public void testRemoveAll() throws Exception {
RedisProcess runner = new RedisRunner()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册