提交 a1d0ff1a 编写于 作者: N Nikita Koksharov

Feature - Micronaut integration #3657

上级 9d7a90b7
......@@ -62,6 +62,7 @@
<module>redisson-hibernate</module>
<module>redisson-helidon</module>
<module>redisson-quarkus</module>
<module>redisson-micronaut</module>
</modules>
<profiles>
......
# Redis integration with Micronaut
Integrates Redisson with [Micronaut](https://micronaut.io/) framework.
Supports Micronaut 2.0.x - 2.5.x
## Usage
### 1. Add `redisson-micronaut` dependency into your project:
Maven
```xml
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-micronaut</artifactId>
<version>3.16.0</version>
</dependency>
```
Gradle
```groovy
compile 'org.redisson:redisson-micronaut:3.16.0'
```
### 2. Add settings into `application.yml` file
#### 2.1 Redisson settings
Config structure is a Redisson YAML configuration -
[single mode](https://github.com/redisson/redisson/wiki/2.-Configuration#262-single-instance-yaml-config-format),
[replicated mode](https://github.com/redisson/redisson/wiki/2.-Configuration#252-replicated-yaml-config-format),
[cluster mode](https://github.com/redisson/redisson/wiki/2.-Configuration#242-cluster-yaml-config-format),
[sentinel mode](https://github.com/redisson/redisson/wiki/2.-Configuration#272-sentinel-yaml-config-format),
[proxy mode](https://github.com/redisson/redisson/wiki/2.-Configuration#292-proxy-mode-yaml-config-format)
NOTE: Setting names in camel case should be joined with hyphens (-).
Config example:
```yaml
redisson:
single-server-config:
address: "redis://127.0.0.1:6379"
threads: 16
netty-threads: 32
```
#### 2.2 Cache settings
|Setting name|Type|Description|
|------------|----|-----------|
|redisson.caches.*.max-size|java.lang.Integer|Max size of this cache. Superfluous elements are evicted using LRU algorithm. If <code>0</code> the cache is unbounded (default).|
|redisson.caches.*.codec|java.lang.Class|Redis data codec applied to cache entries. Default is MarshallingCodec codec.|
|redisson.caches.*.expire-after-write|java.time.Duration|Cache entry time to live duration applied after each write operation.|
|redisson.caches.*.expire-after-access|java.time.Duration|Cache entry time to live duration applied after each read operation.|
Config example:
```yaml
redisson:
single-server-config:
address: "redis://127.0.0.1:6379"
caches:
my-cache1:
expire-after-write: 10s
expire-after-access: 3s
max-size: 1000
codec: org.redisson.codec.MarshallingCodec
my-cache2:
expire-after-write: 200s
expire-after-access: 30s
```
#### 2.3 Session settings
[Session](https://docs.micronaut.io/latest/api/io/micronaut/session/Session.html) store implementation.
Additional settings to [HttpSessionConfiguration](https://docs.micronaut.io/2.5.4/api/io/micronaut/session/http/HttpSessionConfiguration.html) object:
|Setting name|Type|Description|
|------------|----|-----------|
|micronaut.session.http.redisson.enabled|java.lang.Boolean|Enables Session store|
|micronaut.session.http.redisson.key-prefix|java.lang.Integer|Defines string prefix applied to all objects stored in Redis.|
|micronaut.session.http.redisson.codec|java.lang.Class|Redis data codec applied to cache entries. Default is MarshallingCodec codec.|
|micronaut.session.http.redisson.update-mode|java.lang.String|Defines session attributes update mode.<br/>`WRITE_BEHIND` - session changes stored asynchronously.<br/>`AFTER_REQUEST` - session changes stored only on `SessionStore#save(Session)` method invocation. Default value.|
|micronaut.session.http.redisson.broadcastSessionUpdates|java.lang.Boolean|Defines broadcasting of session updates across all micronaut services.|
Config example:
```yaml
micronaut:
session:
http:
redisson:
enabled: true
update-mode: "WRITE_BEHIND"
broadcast-session-updates: false
```
### 3 Use Redisson
#### 3.1 Redisson instance
```java
@Inject
private RedissonClient redisson;
```
#### 3.2 Cache
```java
@Singleton
@CacheConfig("my-cache1")
public class CarsService {
@Cacheable
public List<String> listAll() {
// ...
}
@CachePut(parameters = {"type"})
public List<String> addCar(String type, String description) {
// ...
}
@CacheInvalidate(parameters = {"type"})
public void removeCar(String type, String description) {
// ...
}
}
```
Consider __[Redisson PRO](https://redisson.pro)__ version for **ultra-fast performance** and **support by SLA**.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.redisson</groupId>
<artifactId>redisson-parent</artifactId>
<version>3.15.7-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>
<artifactId>redisson-micronaut</artifactId>
<packaging>jar</packaging>
<name>Redisson/Micronaut Cache and Integration</name>
<build>
<plugins>
<plugin>
<groupId>com.mycila</groupId>
<artifactId>license-maven-plugin</artifactId>
<version>3.0</version>
<configuration>
<basedir>${basedir}</basedir>
<header>${basedir}/../header.txt</header>
<quiet>false</quiet>
<failIfMissing>true</failIfMissing>
<aggregate>false</aggregate>
<includes>
<include>src/main/java/org/redisson/</include>
</includes>
<excludes>
<exclude>target/**</exclude>
</excludes>
<useDefaultExcludes>true</useDefaultExcludes>
<mapping>
<java>JAVADOC_STYLE</java>
</mapping>
<strictCheck>true</strictCheck>
<useDefaultMapping>true</useDefaultMapping>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
<annotationProcessorPaths>
<path>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-inject-java</artifactId>
<version>2.0.0</version>
</path>
<path>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-validation</artifactId>
<version>2.0.0</version>
</path>
</annotationProcessorPaths>
</configuration>
<executions>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<configuration>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
<annotationProcessorPaths>
<path>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-inject-java</artifactId>
<version>2.0.0</version>
</path>
<path>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-validation</artifactId>
<version>2.0.0</version>
</path>
</annotationProcessorPaths>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>io.micronaut.cache</groupId>
<artifactId>micronaut-cache-core</artifactId>
<version>2.4.0</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-session</artifactId>
<version>2.5.5</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-management</artifactId>
<version>2.5.5</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-inject-java</artifactId>
<version>2.5.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.18.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.7.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>5.7.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micronaut.test</groupId>
<artifactId>micronaut-test-junit5</artifactId>
<version>[2.0.0,)</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.micronaut;
import io.micronaut.context.annotation.ConfigurationBuilder;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.Requires;
import org.redisson.config.*;
/**
*
* @author Nikita Koksharov
*
*/
@ConfigurationProperties("redisson")
@Requires(missingBeans = Config.class)
@Requires(property = "redisson")
public class RedissonConfiguration extends Config {
public RedissonConfiguration() {
}
@Override
public SingleServerConfig getSingleServerConfig() {
if (isNotDefined()) {
return useSingleServer();
}
return super.getSingleServerConfig();
}
@Override
@ConfigurationBuilder("singleServerConfig")
protected void setSingleServerConfig(SingleServerConfig singleConnectionConfig) {
super.setSingleServerConfig(singleConnectionConfig);
}
@Override
public ClusterServersConfig getClusterServersConfig() {
if (isNotDefined()) {
return useClusterServers();
}
return super.getClusterServersConfig();
}
@Override
@ConfigurationBuilder(value = "clusterServersConfig", includes = {"nodeAddresses"})
protected void setClusterServersConfig(ClusterServersConfig clusterServersConfig) {
super.setClusterServersConfig(clusterServersConfig);
}
private boolean isNotDefined() {
return super.getSingleServerConfig() == null
&& super.getClusterServersConfig() == null
&& super.getReplicatedServersConfig() == null
&& super.getSentinelServersConfig() == null
&& super.getMasterSlaveServersConfig() == null;
}
@Override
public ReplicatedServersConfig getReplicatedServersConfig() {
if (isNotDefined()) {
return useReplicatedServers();
}
return super.getReplicatedServersConfig();
}
@Override
@ConfigurationBuilder(value = "replicatedServersConfig", includes = {"nodeAddresses"})
protected void setReplicatedServersConfig(ReplicatedServersConfig replicatedServersConfig) {
super.setReplicatedServersConfig(replicatedServersConfig);
}
@Override
public SentinelServersConfig getSentinelServersConfig() {
if (isNotDefined()) {
return useSentinelServers();
}
return super.getSentinelServersConfig();
}
@Override
@ConfigurationBuilder(value = "sentinelServersConfig", includes = {"sentinelAddresses"})
protected void setSentinelServersConfig(SentinelServersConfig sentinelConnectionConfig) {
super.setSentinelServersConfig(sentinelConnectionConfig);
}
@Override
public MasterSlaveServersConfig getMasterSlaveServersConfig() {
if (isNotDefined()) {
return useMasterSlaveServers();
}
return super.getMasterSlaveServersConfig();
}
@Override
@ConfigurationBuilder(value = "masterSlaveServersConfig", includes = {"slaveAddresses"})
protected void setMasterSlaveServersConfig(MasterSlaveServersConfig masterSlaveConnectionConfig) {
super.setMasterSlaveServersConfig(masterSlaveConnectionConfig);
}
}
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.micronaut;
import io.micronaut.context.annotation.*;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.scheduling.TaskExecutors;
import org.redisson.Redisson;
import org.redisson.api.RMap;
import org.redisson.api.RMapCache;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.config.Config;
import org.redisson.micronaut.cache.RedissonSyncCache;
import org.redisson.micronaut.cache.RedissonCacheConfiguration;
import javax.inject.Named;
import javax.inject.Singleton;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
/**
*
* @author Nikita Koksharov
*
*/
@Factory
public class RedissonFactory {
@Requires(beans = Config.class)
@Singleton
@Bean(preDestroy = "shutdown")
public RedissonClient redisson(Config config) {
return Redisson.create(config);
}
@EachBean(RedissonCacheConfiguration.class)
public RedissonSyncCache cache(@Parameter RedissonCacheConfiguration configuration,
RedissonClient redisson,
ConversionService<?> conversionService,
@Named(TaskExecutors.IO) ExecutorService executorService) {
Codec codec = Optional.ofNullable(configuration.getCodec())
.orElse(redisson.getConfig().getCodec());
if (configuration.getExpireAfterAccess().toMillis() != 0
|| configuration.getExpireAfterWrite().toMillis() != 0
|| configuration.getMaxSize() != 0) {
RMapCache<Object, Object> mapCache = redisson.getMapCache(configuration.getName(), codec);
return new RedissonSyncCache(conversionService, mapCache, mapCache, executorService, configuration);
}
RMap<Object, Object> map = redisson.getMap(configuration.getName(), codec);
return new RedissonSyncCache(conversionService, null, map, executorService, configuration);
}
}
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.micronaut.cache;
import io.micronaut.cache.AsyncCache;
import io.micronaut.core.convert.ConversionContext;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.ArgumentUtils;
import org.redisson.api.RMap;
import org.redisson.api.RMapCache;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonAsyncCache implements AsyncCache<RMap<Object, Object>> {
private final ConversionService<?> conversionService;
private final RMapCache<Object, Object> mapCache;
private final RMap<Object, Object> map;
private final ExecutorService executorService;
public RedissonAsyncCache(RMapCache<Object, Object> mapCache,
RMap<Object, Object> map,
ExecutorService executorService,
ConversionService<?> conversionService) {
this.mapCache = mapCache;
this.map = map;
this.executorService = executorService;
this.conversionService = conversionService;
}
@Override
public <T> CompletableFuture<Optional<T>> get(Object key, Argument<T> requiredType) {
ArgumentUtils.requireNonNull("key", key);
return map.getAsync(key)
.thenApply(v -> {
if (v != null) {
return Optional.of((T)conversionService.convert(v, ConversionContext.of(requiredType)));
}
return Optional.<T>empty();
})
.toCompletableFuture();
}
@Override
public <T> CompletableFuture<T> get(Object key, Argument<T> requiredType, Supplier<T> supplier) {
ArgumentUtils.requireNonNull("key", key);
return get(key, requiredType).thenCompose(existingValue -> {
if (existingValue.isPresent()) {
return CompletableFuture.completedFuture(existingValue.get());
} else {
return CompletableFuture.supplyAsync(supplier, executorService)
.thenApply(value -> {
put(key, value);
return value;
});
}
});
}
@Override
public <T> CompletableFuture<Optional<T>> putIfAbsent(Object key, T value) {
ArgumentUtils.requireNonNull("key", key);
ArgumentUtils.requireNonNull("value", value);
return map.putIfAbsentAsync(key, value)
.thenApply(v -> Optional.ofNullable((T) v))
.toCompletableFuture();
}
@Override
public CompletableFuture<Boolean> put(Object key, Object value) {
ArgumentUtils.requireNonNull("key", key);
ArgumentUtils.requireNonNull("value", value);
return map.fastPutAsync(key, value)
.thenApply(counter -> true)
.toCompletableFuture();
}
@Override
public CompletableFuture<Boolean> invalidate(Object key) {
ArgumentUtils.requireNonNull("key", key);
return map.fastRemoveAsync(key)
.thenApply(counter -> true)
.toCompletableFuture();
}
@Override
public CompletableFuture<Boolean> invalidateAll() {
return map.deleteAsync()
.toCompletableFuture();
}
@Override
public String getName() {
return map.getName();
}
@Override
public RMap<Object, Object> getNativeCache() {
return map;
}
}
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.micronaut.cache;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.micronaut.context.annotation.EachProperty;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.core.naming.Named;
import org.redisson.client.codec.Codec;
import java.time.Duration;
/**
* Micronaut Cache settings.
*
* @author Nikita Koksharov
*
*/
@EachProperty("redisson.caches")
public class RedissonCacheConfiguration implements Named {
private final String name;
private Codec codec;
private Duration expireAfterWrite;
private Duration expireAfterAccess;
private int maxSize;
public RedissonCacheConfiguration(@Parameter String name) {
this.name = name;
}
@NonNull
@Override
public String getName() {
return name;
}
public Codec getCodec() {
return codec;
}
/**
* Redis data codec applied to cache entries.
* Default is MarshallingCodec codec
*
* @see org.redisson.client.codec.Codec
* @see org.redisson.codec.MarshallingCodec
*
* @param codec - data codec
* @return config
*/
public void setCodec(Codec codec) {
this.codec = codec;
}
public Duration getExpireAfterWrite() {
return expireAfterWrite;
}
/**
* Cache entry time to live duration applied after each write operation.
*
* @param expireAfterWrite - time to live duration
*/
public void setExpireAfterWrite(Duration expireAfterWrite) {
this.expireAfterWrite = expireAfterWrite;
}
public Duration getExpireAfterAccess() {
return expireAfterAccess;
}
/**
* Cache entry time to live duration applied after each read operation.
*
* @param expireAfterAccess - time to live duration
*/
public void setExpireAfterAccess(Duration expireAfterAccess) {
this.expireAfterAccess = expireAfterAccess;
}
public int getMaxSize() {
return maxSize;
}
/**
* Max size of this cache. Superfluous elements are evicted using LRU algorithm.
*
* @param maxSize - max size
* If <code>0</code> the cache is unbounded (default).
*/
public void setMaxSize(int maxSize) {
this.maxSize = maxSize;
}
}
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.micronaut.cache;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.micronaut.cache.AbstractMapBasedSyncCache;
import io.micronaut.cache.AsyncCache;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.util.ArgumentUtils;
import org.redisson.api.RMap;
import org.redisson.api.RMapCache;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonSyncCache extends AbstractMapBasedSyncCache<RMap<Object, Object>> {
private final ConversionService<?> conversionService;
private final ExecutorService executorService;
private final RedissonCacheConfiguration configuration;
private final RMapCache<Object, Object> mapCache;
private final RMap<Object, Object> map;
public RedissonSyncCache(ConversionService<?> conversionService,
RMapCache<Object, Object> mapCache,
RMap<Object, Object> map,
ExecutorService executorService,
RedissonCacheConfiguration configuration) {
super(conversionService, map);
this.executorService = executorService;
this.configuration = configuration;
this.mapCache = mapCache;
this.map = map;
this.conversionService = conversionService;
if (configuration.getMaxSize() != 0) {
mapCache.setMaxSize(configuration.getMaxSize());
}
}
@Override
public String getName() {
return getNativeCache().getName();
}
@NonNull
@Override
public <T> Optional<T> putIfAbsent(@NonNull Object key, @NonNull T value) {
ArgumentUtils.requireNonNull("key", key);
ArgumentUtils.requireNonNull("value", value);
T res;
if (mapCache != null) {
res = (T) mapCache.putIfAbsent(key, value, configuration.getExpireAfterWrite().toMillis(), TimeUnit.MILLISECONDS,
configuration.getExpireAfterAccess().toMillis(), TimeUnit.MILLISECONDS);
} else {
res = (T) mapCache.putIfAbsent(key, value);
}
return Optional.ofNullable(res);
}
@NonNull
@Override
public <T> T putIfAbsent(@NonNull Object key, @NonNull Supplier<T> value) {
ArgumentUtils.requireNonNull("key", key);
ArgumentUtils.requireNonNull("value", value);
T val = value.get();
T res;
if (mapCache != null) {
res = (T) mapCache.putIfAbsent(key, val, configuration.getExpireAfterWrite().toMillis(), TimeUnit.MILLISECONDS,
configuration.getExpireAfterAccess().toMillis(), TimeUnit.MILLISECONDS);
} else {
res = (T) mapCache.putIfAbsent(key, value);
}
return Optional.ofNullable(res).orElse(val);
}
@Override
public void put(@NonNull Object key, @NonNull Object value) {
ArgumentUtils.requireNonNull("key", key);
ArgumentUtils.requireNonNull("value", value);
if (mapCache != null) {
mapCache.fastPut(key, value, configuration.getExpireAfterWrite().toMillis(), TimeUnit.MILLISECONDS,
configuration.getExpireAfterAccess().toMillis(), TimeUnit.MILLISECONDS);
} else {
mapCache.fastPut(key, value);
}
}
@NonNull
@Override
public AsyncCache<RMap<Object, Object>> async() {
return new RedissonAsyncCache(mapCache, map, executorService, conversionService);
}
}
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.micronaut.session;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import java.io.IOException;
import java.io.Serializable;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeMessage implements Serializable {
private String sessionId;
private String nodeId;
public AttributeMessage() {
}
public AttributeMessage(String nodeId, String sessionId) {
this.nodeId = nodeId;
this.sessionId = sessionId;
}
public String getSessionId() {
return sessionId;
}
public String getNodeId() {
return nodeId;
}
protected byte[] toByteArray(Encoder encoder, Object value) throws IOException {
if (value == null) {
return null;
}
ByteBuf buf = encoder.encode(value);
try {
return ByteBufUtil.getBytes(buf);
} finally {
buf.release();
}
}
protected Object toObject(Decoder<?> decoder, byte[] value) throws IOException, ClassNotFoundException {
if (value == null) {
return null;
}
ByteBuf buf = Unpooled.wrappedBuffer(value);
try {
return decoder.decode(buf, null);
} finally {
buf.release();
}
}
}
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.micronaut.session;
import java.util.Set;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeRemoveMessage extends AttributeMessage {
private Set<CharSequence> names;
public AttributeRemoveMessage() {
super();
}
public AttributeRemoveMessage(String nodeId, String sessionId, Set<CharSequence> names) {
super(nodeId, sessionId);
this.names = names;
}
public Set<CharSequence> getNames() {
return names;
}
}
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.micronaut.session;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import java.io.IOException;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributeUpdateMessage extends AttributeMessage {
private String name;
private byte[] value;
public AttributeUpdateMessage() {
}
public AttributeUpdateMessage(String nodeId, String sessionId, String name, Object value, Encoder encoder) throws IOException {
super(nodeId, sessionId);
this.name = name;
this.value = toByteArray(encoder, value);
}
public String getName() {
return name;
}
public Object getValue(Decoder<?> decoder) throws IOException, ClassNotFoundException {
return toObject(decoder, value);
}
}
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.micronaut.session;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesClearMessage extends AttributeMessage {
public AttributesClearMessage() {
}
public AttributesClearMessage(String nodeId, String sessionId) {
super(nodeId, sessionId);
}
}
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.micronaut.session;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
/**
*
* @author Nikita Koksharov
*
*/
public class AttributesPutAllMessage extends AttributeMessage {
private Map<CharSequence, byte[]> attrs;
public AttributesPutAllMessage() {
}
public AttributesPutAllMessage(String nodeId, String sessionId, Map<CharSequence, Object> attrs, Encoder encoder) throws IOException {
super(nodeId, sessionId);
if (attrs != null) {
this.attrs = new HashMap<>();
for (Entry<CharSequence, Object> entry: attrs.entrySet()) {
this.attrs.put(entry.getKey(), toByteArray(encoder, entry.getValue()));
}
} else {
this.attrs = null;
}
}
public Map<CharSequence, Object> getAttrs(Decoder<?> decoder) throws IOException, ClassNotFoundException {
if (attrs == null) {
return null;
}
Map<CharSequence, Object> result = new HashMap<>();
for (Entry<CharSequence, byte[]> entry: attrs.entrySet()) {
result.put(entry.getKey(), toObject(decoder, entry.getValue()));
}
return result;
}
}
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.micronaut.session;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.util.Toggleable;
import io.micronaut.session.http.HttpSessionConfiguration;
import org.redisson.client.codec.Codec;
/**
* Micronaut Session settings.
*
* @author Nikita Koksharov
*/
@ConfigurationProperties("redisson")
public class RedissonHttpSessionConfiguration extends HttpSessionConfiguration implements Toggleable {
public enum UpdateMode {WRITE_BEHIND, AFTER_REQUEST}
private String keyPrefix = "";
private Codec codec;
private UpdateMode updateMode = UpdateMode.AFTER_REQUEST;
private boolean broadcastSessionUpdates = false;
public boolean isBroadcastSessionUpdates() {
return broadcastSessionUpdates;
}
/**
* Defines broadcasting of session updates across all micronaut services.
*
* @param broadcastSessionUpdates - if true then session changes are broadcasted.
*/
public void setBroadcastSessionUpdates(boolean broadcastSessionUpdates) {
this.broadcastSessionUpdates = broadcastSessionUpdates;
}
public UpdateMode getUpdateMode() {
return updateMode;
}
/**
* Defines session attributes update mode.
* <p>
* WRITE_BEHIND - session changes stored asynchronously.
* AFTER_REQUEST - session changes stored only on io.micronaut.session.SessionStore#save(io.micronaut.session.Session) method invocation.
* <p>
* Default is AFTER_REQUEST.
*
* @param updateMode - mode value
*/
public void setUpdateMode(UpdateMode updateMode) {
this.updateMode = updateMode;
}
public Codec getCodec() {
return codec;
}
/**
* Redis data codec applied to session values.
* Default is MarshallingCodec codec
*
* @see org.redisson.client.codec.Codec
* @see org.redisson.codec.MarshallingCodec
*
* @param codec - data codec
* @return config
*/
public void setCodec(Codec codec) {
this.codec = codec;
}
public String getKeyPrefix() {
return keyPrefix;
}
/**
* Defines string prefix applied to all objects stored in Redis.
*
* @param keyPrefix - key prefix value
*/
public void setKeyPrefix(String keyPrefix) {
this.keyPrefix = keyPrefix;
}
}
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.micronaut.session;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.micronaut.core.convert.value.MutableConvertibleValues;
import io.micronaut.session.InMemorySession;
import io.micronaut.session.Session;
import org.redisson.api.*;
import org.redisson.client.codec.IntegerCodec;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonSession extends InMemorySession implements Session {
private static final String MAX_INACTIVE_INTERVAL_ATTR = "session:maxInactiveInterval";
private static final String LAST_ACCESSED_TIME_ATTR = "session:lastAccessedTime";
private static final String CREATION_TIME_ATTR = "session:creationTime";
private final RedissonSessionStore redissonManager;
private final RMap<CharSequence, Object> map;
private final RTopic topic;
private final RedissonHttpSessionConfiguration.UpdateMode updateMode;
private Instant creationTime;
private boolean broadcastSessionUpdates;
private Set<String> removedAttributes = Collections.emptySet();
private Map<String, Object> updatedAttributes = Collections.emptyMap();
public RedissonSession(RedissonSessionStore redissonManager,
String id,
RedissonHttpSessionConfiguration.UpdateMode updateMode) {
this(redissonManager, id, updateMode, Duration.ZERO);
}
public RedissonSession(RedissonSessionStore redissonManager,
String id,
RedissonHttpSessionConfiguration.UpdateMode updateMode,
Duration maxInactiveInterval) {
super(id, maxInactiveInterval);
this.redissonManager = redissonManager;
this.updateMode = updateMode;
this.topic = redissonManager.getTopic();
if (updateMode == RedissonHttpSessionConfiguration.UpdateMode.AFTER_REQUEST) {
removedAttributes = Collections.newSetFromMap(new ConcurrentHashMap<>());
updatedAttributes = new ConcurrentHashMap<>();
}
this.creationTime = super.getCreationTime();
super.setLastAccessedTime(creationTime);
map = redissonManager.getMap(getId());
}
@NonNull
@Override
public Instant getCreationTime() {
return creationTime;
}
@Override
public MutableConvertibleValues<Object> clear() {
if (!isNew()) {
removedAttributes.addAll(names());
if (updateMode == RedissonHttpSessionConfiguration.UpdateMode.WRITE_BEHIND && map != null) {
delete();
}
}
return super.clear();
}
public CompletableFuture<Void> delete() {
RBatch batch = redissonManager.createBatch();
RMapAsync<CharSequence, Object> m = batch.getMap(map.getName(), map.getCodec());
RBucketAsync<Integer> b = batch.getBucket(redissonManager.getNotificationBucket(getId()).getName(), IntegerCodec.INSTANCE);
b.deleteAsync();
m.fastPutAsync(LAST_ACCESSED_TIME_ATTR, 0L);
m.expireAsync(10, TimeUnit.SECONDS);
if (broadcastSessionUpdates) {
RTopicAsync t = batch.getTopic(topic.getChannelNames().get(0));
t.publishAsync(new AttributesClearMessage(redissonManager.getNodeId(), getId()));
}
return batch.executeAsync().thenApply(s -> (Void)null).toCompletableFuture();
}
protected void expireSession() {
if (getMaxInactiveInterval().getSeconds() >= 0) {
RBatch batch = redissonManager.createBatch();
RMapAsync<CharSequence, Object> m = batch.getMap(map.getName(), map.getCodec());
RBucketAsync<Integer> b = batch.getBucket(redissonManager.getNotificationBucket(getId()).getName(), IntegerCodec.INSTANCE);
b.setAsync(1);
b.expireAsync(getMaxInactiveInterval().getSeconds(), TimeUnit.SECONDS);
m.expireAsync(getMaxInactiveInterval().getSeconds() + 10, TimeUnit.SECONDS);
batch.executeAsync();
}
}
protected AttributesPutAllMessage createPutAllMessage(Map<CharSequence, Object> newMap) {
Map<CharSequence, Object> map = new HashMap<>();
for (Map.Entry<CharSequence, Object> entry : newMap.entrySet()) {
map.put(entry.getKey(), entry.getValue());
}
try {
return new AttributesPutAllMessage(redissonManager.getNodeId(), getId(), map, this.map.getCodec().getMapValueEncoder());
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
@Override
public Session setMaxInactiveInterval(Duration duration) {
if (updateMode == RedissonHttpSessionConfiguration.UpdateMode.WRITE_BEHIND && map != null) {
fastPut(MAX_INACTIVE_INTERVAL_ATTR, duration.toMillis());
expireSession();
}
if (updateMode == RedissonHttpSessionConfiguration.UpdateMode.AFTER_REQUEST) {
updatedAttributes.put(MAX_INACTIVE_INTERVAL_ATTR, duration.toMillis());
}
return super.setMaxInactiveInterval(duration);
}
private void fastPut(String name, Object value) {
if (map == null) {
return;
}
map.fastPutAsync(name, value);
try {
if (broadcastSessionUpdates) {
topic.publishAsync(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value, this.map.getCodec().getMapValueEncoder()));
}
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
@Override
public Session setLastAccessedTime(Instant instant) {
if (updateMode == RedissonHttpSessionConfiguration.UpdateMode.WRITE_BEHIND && map != null) {
fastPut(LAST_ACCESSED_TIME_ATTR, instant.toEpochMilli());
expireSession();
}
if (updateMode == RedissonHttpSessionConfiguration.UpdateMode.AFTER_REQUEST) {
updatedAttributes.put(LAST_ACCESSED_TIME_ATTR, instant.toEpochMilli());
}
return super.setLastAccessedTime(instant);
}
public void superPut(CharSequence name, Object value) {
super.put(name, value);
}
@Override
public MutableConvertibleValues<Object> put(CharSequence key, Object value) {
if (value == null) {
return super.put(key, value);
}
if (updateMode == RedissonHttpSessionConfiguration.UpdateMode.WRITE_BEHIND && map != null) {
fastPut(key.toString(), value);
}
if (updateMode == RedissonHttpSessionConfiguration.UpdateMode.AFTER_REQUEST) {
updatedAttributes.put(key.toString(), value);
removedAttributes.remove(key.toString());
}
return super.put(key, value);
}
public void superRemove(CharSequence key) {
super.remove(key);
}
@Override
public MutableConvertibleValues<Object> remove(CharSequence key) {
if (updateMode == RedissonHttpSessionConfiguration.UpdateMode.WRITE_BEHIND && map != null) {
map.fastRemoveAsync(key.toString());
if (broadcastSessionUpdates) {
topic.publishAsync(new AttributeRemoveMessage(redissonManager.getNodeId(), getId(), new HashSet<>(Arrays.asList(key))));
}
}
if (updateMode == RedissonHttpSessionConfiguration.UpdateMode.AFTER_REQUEST) {
updatedAttributes.remove(key.toString());
removedAttributes.add(key.toString());
}
return super.remove(key);
}
public CompletableFuture<RedissonSession> save() {
Map<CharSequence, Object> newMap = new HashMap<>();
if (isNew() || updateMode == RedissonHttpSessionConfiguration.UpdateMode.WRITE_BEHIND) {
newMap.put(LAST_ACCESSED_TIME_ATTR, getLastAccessedTime().toEpochMilli());
newMap.put(MAX_INACTIVE_INTERVAL_ATTR, getMaxInactiveInterval().toMillis());
newMap.put(CREATION_TIME_ATTR, getCreationTime().toEpochMilli());
for (Map.Entry<CharSequence, Object> entry : attributeMap.entrySet()) {
newMap.put(entry.getKey(), entry.getValue());
}
} else {
newMap.putAll(updatedAttributes);
}
if (newMap.isEmpty()) {
return CompletableFuture.completedFuture(this);
}
RBatch batch = redissonManager.createBatch();
RMapAsync<CharSequence, Object> m = batch.getMap(map.getName(), map.getCodec());
m.putAllAsync(newMap);
m.fastRemoveAsync(removedAttributes.toArray(new String[0]));
RBucketAsync<Integer> bucket = batch.getBucket(redissonManager.getNotificationBucket(getId()).getName(), IntegerCodec.INSTANCE);
bucket.setAsync(1);
if (broadcastSessionUpdates) {
RTopicAsync t = batch.getTopic(topic.getChannelNames().get(0));
t.publishAsync(createPutAllMessage(newMap));
if (updateMode == RedissonHttpSessionConfiguration.UpdateMode.AFTER_REQUEST) {
if (!removedAttributes.isEmpty()) {
t.publishAsync(new AttributeRemoveMessage(redissonManager.getNodeId(), getId(), new HashSet<>(removedAttributes)));
}
}
}
removedAttributes.clear();
updatedAttributes.clear();
if (getMaxInactiveInterval().getSeconds() >= 0) {
bucket.expireAsync(getMaxInactiveInterval().getSeconds(), TimeUnit.SECONDS);
m.expireAsync(getMaxInactiveInterval().getSeconds() + 10, TimeUnit.SECONDS);
}
return batch.executeAsync().thenApply(b -> this).toCompletableFuture();
}
public void load(Map<CharSequence, Object> attrs) {
Long creationTime = (Long) attrs.remove(CREATION_TIME_ATTR);
if (creationTime != null) {
this.creationTime = Instant.ofEpochMilli(creationTime);
}
Long lastAccessedTime = (Long) attrs.remove(LAST_ACCESSED_TIME_ATTR);
if (lastAccessedTime != null) {
super.setLastAccessedTime(Instant.ofEpochMilli(lastAccessedTime));
}
Long maxInactiveInterval = (Long) attrs.remove(MAX_INACTIVE_INTERVAL_ATTR);
if (maxInactiveInterval != null) {
super.setMaxInactiveInterval(Duration.ofMillis(maxInactiveInterval));
}
setNew(false);
for (Map.Entry<CharSequence, Object> entry : attrs.entrySet()) {
attributeMap.put(entry.getKey(), entry.getValue());
}
}
}
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.micronaut.session;
import io.micronaut.context.BeanLocator;
import io.micronaut.context.annotation.Primary;
import io.micronaut.context.annotation.Replaces;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.serialize.ObjectSerializer;
import io.micronaut.core.util.StringUtils;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.session.InMemorySessionStore;
import io.micronaut.session.SessionIdGenerator;
import io.micronaut.session.SessionSettings;
import io.micronaut.session.SessionStore;
import io.micronaut.session.event.SessionCreatedEvent;
import io.micronaut.session.event.SessionDeletedEvent;
import io.micronaut.session.event.SessionExpiredEvent;
import org.redisson.api.*;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.PatternMessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.IntegerCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.codec.CompositeCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Named;
import javax.inject.Singleton;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
/**
*
* @author Nikita Koksharov
*
*/
@Singleton
@Primary
@Requires(property = RedissonSessionStore.ENABLED, value = StringUtils.TRUE)
@Replaces(InMemorySessionStore.class)
public class RedissonSessionStore implements SessionStore<RedissonSession>, PatternMessageListener<String>, MessageListener<String> {
public static final String ENABLED = SessionSettings.HTTP + ".redisson.enabled";
private static final String SESSION_PREFIX = "redisson:session:";
private MessageListener<AttributeMessage> messageListener;
private final String nodeId = UUID.randomUUID().toString();
private static final Logger LOG = LoggerFactory.getLogger(RedissonSessionStore.class);
private RPatternTopic deletedTopic;
private RPatternTopic expiredTopic;
private RTopic createdTopic;
private RedissonClient redisson;
private final SessionIdGenerator sessionIdGenerator;
private RedissonHttpSessionConfiguration sessionConfiguration;
private final ApplicationEventPublisher eventPublisher;
public RedissonSessionStore(
RedissonClient redisson,
SessionIdGenerator sessionIdGenerator,
RedissonHttpSessionConfiguration sessionConfiguration,
ApplicationEventPublisher eventPublisher) {
this.sessionIdGenerator = sessionIdGenerator;
this.sessionConfiguration = sessionConfiguration;
this.eventPublisher = eventPublisher;
this.redisson = redisson;
deletedTopic = redisson.getPatternTopic("__keyevent@*:del", StringCodec.INSTANCE);
expiredTopic = redisson.getPatternTopic("__keyevent@*:expired", StringCodec.INSTANCE);
createdTopic = redisson.getTopic(getEventsChannelPrefix(), StringCodec.INSTANCE);
deletedTopic.addListener(String.class, this);
expiredTopic.addListener(String.class, this);
createdTopic.addListener(String.class, this);
if (sessionConfiguration.isBroadcastSessionUpdates()) {
RTopic updatesTopic = getTopic();
messageListener = new MessageListener<AttributeMessage>() {
@Override
public void onMessage(CharSequence channel, AttributeMessage msg) {
if (msg.getNodeId().equals(nodeId)) {
return;
}
findSession(msg.getSessionId()).thenAccept(s -> {
if (s.isPresent()) {
return;
}
try {
RedissonSession session = s.get();
if (msg instanceof AttributeRemoveMessage) {
for (CharSequence name : ((AttributeRemoveMessage)msg).getNames()) {
session.superRemove(name);
}
}
if (msg instanceof AttributesClearMessage) {
deleteSession(session.getId());
}
if (msg instanceof AttributesPutAllMessage) {
AttributesPutAllMessage m = (AttributesPutAllMessage) msg;
Map<CharSequence, Object> attrs = m.getAttrs(getCodec().getMapValueDecoder());
session.load(attrs);
}
if (msg instanceof AttributeUpdateMessage) {
AttributeUpdateMessage m = (AttributeUpdateMessage)msg;
session.superPut(m.getName(), m.getValue(getCodec().getMapValueDecoder()));
}
} catch (Exception e) {
LOG.error("Unable to handle topic message", e);
}
});
}
};
updatesTopic.addListener(AttributeMessage.class, messageListener);
}
}
String getEventsChannelPrefix() {
return sessionConfiguration.getKeyPrefix() + "sessions:created:";
}
String getExpiredKeyPrefix() {
return sessionConfiguration.getKeyPrefix() + "sessions:expires:";
}
@Override
public RedissonSession newSession() {
return new RedissonSession(this, sessionIdGenerator.generateId(),
sessionConfiguration.getUpdateMode(), sessionConfiguration.getMaxInactiveInterval());
}
@Override
public CompletableFuture<Optional<RedissonSession>> findSession(String id) {
return loadSession(id, false);
}
@Override
public CompletableFuture<Boolean> deleteSession(String id) {
return loadSession(id, false).thenCompose(optional -> {
return optional.map(s -> {
return s.delete().thenApply(r -> {
return true;
});
}).orElse(CompletableFuture.completedFuture(false));
}).toCompletableFuture();
}
@Override
public CompletableFuture<RedissonSession> save(RedissonSession session) {
CompletableFuture<RedissonSession> f = session.save();
return f.thenCompose(v -> {
if (session.isNew()) {
return createdTopic.publishAsync(v.getId()).thenApply(val -> v);
}
return CompletableFuture.completedFuture(session);
});
}
@Override
public void onMessage(CharSequence pattern, CharSequence channel, String body) {
if (deletedTopic.getPatternNames().contains(pattern.toString())) {
if (!body.contains(SESSION_PREFIX +"notification:")) {
return;
}
String id = body.split(SESSION_PREFIX +"notification:")[1];
loadSession(id, true).whenComplete((r, e) -> {
r.ifPresent(v -> {
eventPublisher.publishEvent(new SessionDeletedEvent(v));
});
});
} else if (expiredTopic.getPatternNames().contains(pattern.toString())) {
if (!body.contains(SESSION_PREFIX +"notification:")) {
return;
}
String id = body.split(SESSION_PREFIX +"notification:")[1];
loadSession(id, true).whenComplete((r, e) -> {
r.ifPresent(v -> {
eventPublisher.publishEvent(new SessionExpiredEvent(v));
});
});
}
}
private CompletableFuture<Optional<RedissonSession>> loadSession(String id, boolean useExpired) {
RMap<CharSequence, Object> map = getMap(id);
return map.readAllMapAsync().thenApply(data -> {
if (data.isEmpty()) {
return Optional.<RedissonSession>empty();
}
RedissonSession session = new RedissonSession(this, id,
sessionConfiguration.getUpdateMode());
session.load(data);
if (useExpired || !session.isExpired()) {
return Optional.of(session);
}
return Optional.<RedissonSession>empty();
}).toCompletableFuture();
}
@Override
public void onMessage(CharSequence channel, String id) {
loadSession(id, true).whenComplete((r, e) -> {
r.ifPresent(v -> {
eventPublisher.publishEvent(new SessionCreatedEvent(v));
});
});
}
public RTopic getTopic() {
String keyPrefix = sessionConfiguration.getKeyPrefix();
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
final String name = keyPrefix + separator + "redisson:session_updates";
return redisson.getTopic(name);
}
public String getNodeId() {
return nodeId;
}
public RBatch createBatch() {
return redisson.createBatch();
}
private Codec getCodec() {
return Optional.ofNullable(sessionConfiguration.getCodec()).orElse(redisson.getConfig().getCodec());
}
public RMap<CharSequence, Object> getMap(String sessionId) {
String keyPrefix = sessionConfiguration.getKeyPrefix();
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
String name = keyPrefix + separator + SESSION_PREFIX + sessionId;
return redisson.getMap(name, new CompositeCodec(StringCodec.INSTANCE, getCodec(), getCodec()));
}
public RBucket<Integer> getNotificationBucket(String sessionId) {
String keyPrefix = sessionConfiguration.getKeyPrefix();
String separator = keyPrefix == null || keyPrefix.isEmpty() ? "" : ":";
String name = keyPrefix + separator + SESSION_PREFIX +"notification:" + sessionId;
return redisson.getBucket(name, IntegerCodec.INSTANCE);
}
}
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.micronaut.session;
/**
*
* @author Nikita Koksharov
*
*/
public class SessionCreatedMessage extends AttributeMessage {
public SessionCreatedMessage() {
}
public SessionCreatedMessage(String nodeId, String sessionId) {
super(nodeId, sessionId);
}
}
/**
* Copyright (c) 2013-2021 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.micronaut.session;
/**
*
* @author Nikita Koksharov
*
*/
public class SessionDestroyedMessage extends AttributeMessage {
public SessionDestroyedMessage() {
}
public SessionDestroyedMessage(String nodeId, String sessionId) {
super(nodeId, sessionId);
}
}
Args=--initialize-at-build-time=org.redisson.RedissonReference,org.redisson.executor.CronExpression,org.redisson.liveobject.core.RedissonObjectBuilder
[
["org.redisson.executor.RemoteExecutorServiceAsync"],
["org.redisson.api.RAtomicDoubleReactive"],
["org.redisson.api.RAtomicLongReactive"],
["org.redisson.api.RBatchReactive"],
["org.redisson.api.RBinaryStreamReactive"],
["org.redisson.api.RBitSetReactive"],
["org.redisson.api.RBlockingDequeReactive"],
["org.redisson.api.RBlockingQueueReactive"],
["org.redisson.api.RBucketReactive"],
["org.redisson.api.RBucketsReactive"],
["org.redisson.api.RCollectionReactive"],
["org.redisson.api.RCountDownLatchReactive"],
["org.redisson.api.RDequeReactive"],
["org.redisson.api.RExpirableReactive"],
["org.redisson.api.RGeoReactive"],
["org.redisson.api.RHyperLogLogReactive"],
["org.redisson.api.RIdGeneratorReactive"],
["org.redisson.api.RKeysReactive"],
["org.redisson.api.RLexSortedSetReactive"],
["org.redisson.api.RListMultimapReactive"],
["org.redisson.api.RListReactive"],
["org.redisson.api.RLockReactive"],
["org.redisson.api.RMapCacheReactive"],
["org.redisson.api.RMapReactive"],
["org.redisson.api.RMultimapReactive"],
["org.redisson.api.RObjectReactive"],
["org.redisson.api.RPatternTopicReactive"],
["org.redisson.api.RPermitExpirableSemaphoreReactive"],
["org.redisson.api.RQueueReactive"],
["org.redisson.api.RRateLimiterReactive"],
["org.redisson.api.RReadWriteLockReactive"],
["org.redisson.api.RReliableTopicReactive"],
["org.redisson.api.RRingBufferReactive"],
["org.redisson.api.RScoredSortedSetReactive"],
["org.redisson.api.RScriptReactive"],
["org.redisson.api.RSemaphoreReactive"],
["org.redisson.api.RSetCacheReactive"],
["org.redisson.api.RSetMultimapReactive"],
["org.redisson.api.RSetReactive"],
["org.redisson.api.RSortableReactive"],
["org.redisson.api.RStreamReactive"],
["org.redisson.api.RTimeSeriesReactive"],
["org.redisson.api.RTopicReactive"],
["org.redisson.api.RTransactionReactive"],
["org.redisson.api.RTransferQueueReactive"],
["org.redisson.api.RAtomicDoubleRx"],
["org.redisson.api.RAtomicLongRx"],
["org.redisson.api.RBatchRx"],
["org.redisson.api.RBinaryStreamRx"],
["org.redisson.api.RBitSetRx"],
["org.redisson.api.RBlockingDequeRx"],
["org.redisson.api.RBlockingQueueRx"],
["org.redisson.api.RBucketRx"],
["org.redisson.api.RBucketsRx"],
["org.redisson.api.RCollectionRx"],
["org.redisson.api.RCountDownLatchRx"],
["org.redisson.api.RDequeRx"],
["org.redisson.api.RExpirableRx"],
["org.redisson.api.RGeoRx"],
["org.redisson.api.RHyperLogLogRx"],
["org.redisson.api.RIdGeneratorRx"],
["org.redisson.api.RKeysRx"],
["org.redisson.api.RLexSortedSetRx"],
["org.redisson.api.RListMultimapRx"],
["org.redisson.api.RListRx"],
["org.redisson.api.RLockRx"],
["org.redisson.api.RMapCacheRx"],
["org.redisson.api.RMapRx"],
["org.redisson.api.RMultimapRx"],
["org.redisson.api.RObjectRx"],
["org.redisson.api.RPatternTopicRx"],
["org.redisson.api.RPermitExpirableSemaphoreRx"],
["org.redisson.api.RQueueRx"],
["org.redisson.api.RRateLimiterRx"],
["org.redisson.api.RReadWriteLockRx"],
["org.redisson.api.RReliableTopicRx"],
["org.redisson.api.RRingBufferRx"],
["org.redisson.api.RScoredSortedSetRx"],
["org.redisson.api.RScriptRx"],
["org.redisson.api.RSemaphoreRx"],
["org.redisson.api.RSetCacheRx"],
["org.redisson.api.RSetMultimapRx"],
["org.redisson.api.RSetRx"],
["org.redisson.api.RSortableRx"],
["org.redisson.api.RStreamRx"],
["org.redisson.api.RTimeSeriesRx"],
["org.redisson.api.RTopicRx"],
["org.redisson.api.RTransactionRx"],
["org.redisson.api.RTransferQueueRx"]
]
[
{
"name":"io.netty.buffer.AbstractByteBufAllocator",
"allDeclaredMethods":true
},
{
"name":"io.netty.buffer.AbstractReferenceCountedByteBuf",
"fields":[{"name":"refCnt", "allowUnsafeAccess":true}]
},
{
"name":"io.netty.util.ReferenceCountUtil",
"allDeclaredMethods":true
},
{
"name":"io.netty.channel.EventLoopGroup",
"allDeclaredMethods":true
},
{
"name":"io.netty.channel.socket.nio.NioDatagramChannel",
"methods":[{"name":"<init>","parameterTypes":[] }]
},
{
"name":"io.netty.channel.socket.nio.NioSocketChannel",
"methods":[{"name":"<init>","parameterTypes":[] }]
},
{
"name":"io.netty.handler.codec.ByteToMessageDecoder",
"methods":[
{"name":"channelInactive","parameterTypes":["io.netty.channel.ChannelHandlerContext"] },
{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] },
{"name":"channelReadComplete","parameterTypes":["io.netty.channel.ChannelHandlerContext"] },
{"name":"userEventTriggered","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }
]
},
{
"name":"io.netty.handler.codec.MessageToMessageDecoder",
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }]
},
{
"name":"io.netty.handler.codec.MessageToMessageEncoder",
"methods":[{"name":"write","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object","io.netty.channel.ChannelPromise"] }]
},
{
"name":"io.netty.handler.codec.dns.DatagramDnsQueryEncoder"
},
{
"name":"io.netty.resolver.dns.DnsNameResolver$1"
},
{
"name":"io.netty.resolver.dns.DnsNameResolver$3"
},
{
"name":"io.netty.resolver.dns.DnsNameResolver$DnsResponseHandler",
"methods":[
{"name":"channelActive","parameterTypes":["io.netty.channel.ChannelHandlerContext"] },
{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] },
{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }
]
},
{
"name":"io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueColdProducerFields",
"fields":[{"name":"producerLimit", "allowUnsafeAccess":true}]
},
{
"name":"io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueConsumerFields",
"fields":[{"name":"consumerIndex", "allowUnsafeAccess":true}]
},
{
"name":"io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueProducerFields",
"fields":[{"name":"producerIndex", "allowUnsafeAccess":true}]
},
{
"name":"io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueConsumerIndexField",
"fields":[{"name":"consumerIndex", "allowUnsafeAccess":true}]
},
{
"name":"io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerIndexField",
"fields":[{"name":"producerIndex", "allowUnsafeAccess":true}]
},
{
"name":"io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerLimitField",
"fields":[{"name":"producerLimit", "allowUnsafeAccess":true}]
},
{
"name":"java.sql.Date"
},
{
"name":"java.sql.Timestamp"
},
{
"name":"java.util.Collections$ReverseComparator2",
"allDeclaredFields":true
},
{
"name":"java.util.Collections$UnmodifiableCollection",
"allDeclaredFields":true
},
{
"name":"java.util.Collections$UnmodifiableList",
"allDeclaredFields":true
},
{
"name":"java.util.Collections$UnmodifiableMap",
"allDeclaredFields":true
},
{
"name":"java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet",
"allDeclaredFields":true,
"methods":[{"name":"<init>","parameterTypes":["java.util.Set"] }]
},
{
"name":"java.util.Collections$UnmodifiableRandomAccessList",
"allDeclaredFields":true
},
{
"name":"java.util.Collections$UnmodifiableSet",
"allDeclaredFields":true
},
{
"name":"java.util.Collections$UnmodifiableSortedMap",
"allDeclaredFields":true
},
{
"name":"java.util.Collections$UnmodifiableSortedSet",
"allDeclaredFields":true
},
{
"name":"java.util.EnumMap",
"fields":[{"name":"keyType"}]
},
{
"name":"java.util.EnumSet$SerializationProxy",
"fields":[
{"name":"elementType"},
{"name":"elements"}
]
},
{
"name":"java.util.HashSet",
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"org.redisson.RedissonAtomicDouble",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonAtomicLong",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonBatch",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonBinaryStream",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonBitSet",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonBlockingDeque",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonBlockingQueue",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonBloomFilter",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonBoundedBlockingQueue",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonBucket",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonBuckets",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonCountDownLatch",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonDelayedQueue",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonDeque",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonDoubleAdder",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonExecutorService",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonFairLock",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonGeo",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonHyperLogLog",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonIdGenerator",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonKeys",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonLexSortedSet",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonList",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonListMultimap",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonListMultimapCache",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonListMultimapValues",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonLiveObjectService",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonLocalCachedMap",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonLock",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonLongAdder",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonMap",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonMapCache",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonMultiLock",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonMultimap",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonMultimapCache",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonPatternTopic",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonPermitExpirableSemaphore",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonPriorityBlockingDeque",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonPriorityBlockingQueue",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonPriorityDeque",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonPriorityQueue",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonQueue",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonQueueSemaphore",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonRateLimiter",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonReadLock",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonReadWriteLock",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonRedLock",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonReliableTopic",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonRemoteService",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonRingBuffer",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonScoredSortedSet",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonScript",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonSemaphore",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonSet",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonSetCache",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonSetMultimap",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonSetMultimapCache",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonSetMultimapValues",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonSortedSet",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonSpinLock",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonStream",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonTimeSeries",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonTopic",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonTransferQueue",
"allDeclaredMethods":true
},
{
"name":"org.redisson.RedissonWriteLock",
"allDeclaredMethods":true
},
{
"name":"org.redisson.config.BaseConfig",
"allDeclaredFields":true,
"allDeclaredMethods":true
},
{
"name":"org.redisson.config.BaseMasterSlaveServersConfig",
"allDeclaredFields":true,
"allDeclaredMethods":true
},
{
"name":"org.redisson.config.ClusterServersConfig",
"allDeclaredFields":true,
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"org.redisson.config.Config",
"allDeclaredFields":true,
"allDeclaredMethods":true
},
{
"name":"org.redisson.config.ConfigSupport$ClassMixIn",
"allDeclaredMethods":true
},
{
"name":"org.redisson.config.ConfigSupport$ConfigMixIn",
"allDeclaredFields":true,
"allDeclaredMethods":true
},
{
"name":"org.redisson.config.MasterSlaveServersConfig",
"allDeclaredFields":true,
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"org.redisson.config.RedissonNodeFileConfig",
"allDeclaredFields":true,
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"org.redisson.config.ReplicatedServersConfig",
"allDeclaredFields":true,
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"org.redisson.config.SentinelServersConfig",
"allDeclaredFields":true,
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"org.redisson.config.SingleServerConfig",
"allDeclaredFields":true,
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"org.redisson.executor.RemoteExecutorService",
"methods":[
{"name":"executeCallable","parameterTypes":["org.redisson.executor.params.TaskParameters"] },
{"name":"executeRunnable","parameterTypes":["org.redisson.executor.params.TaskParameters"] },
{"name":"schedule","parameterTypes":["org.redisson.executor.params.ScheduledCronExpressionParameters"] },
{"name":"scheduleAtFixedRate","parameterTypes":["org.redisson.executor.params.ScheduledAtFixedRateParameters"] },
{"name":"scheduleCallable","parameterTypes":["org.redisson.executor.params.ScheduledParameters"] },
{"name":"scheduleRunnable","parameterTypes":["org.redisson.executor.params.ScheduledParameters"] },
{"name":"scheduleWithFixedDelay","parameterTypes":["org.redisson.executor.params.ScheduledWithFixedDelayParameters"] }
]
},
{
"name":"org.redisson.executor.RemoteExecutorServiceAsync",
"allPublicMethods":true
}
]
{
"resources":{
"includes":[
{"pattern":"\\QMETA-INF/MANIFEST.MF\\E"},
{"pattern":"\\QMETA-INF/services/org.jboss.marshalling.ProviderDescriptor\\E"}
]},
"bundles":[]
}
package org.redisson.micronaut;
import io.micronaut.context.ApplicationContext;
import io.micronaut.inject.qualifiers.Qualifiers;
import org.junit.jupiter.api.Test;
import org.redisson.api.RedissonClient;
import org.redisson.micronaut.cache.RedissonSyncCache;
import java.util.HashMap;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
/**
*
* @author Nikita Koksharov
*
*/
public class RedissonCacheTest {
@Test
public void test1() throws InterruptedException {
Map<String, Object> map = new HashMap<>();
map.put("redisson.threads", "10");
map.put("redisson.single-server-config.address", "redis://127.0.0.1:6379");
// map.put("redisson.clusterServersConfig.scanInterval", "3333");
// map.put("redisson.clusterServersConfig.nodeAddresses", Arrays.asList("redis://127.0.0.2:6379","redis://127.0.0.3:6379"));
map.put("redisson.caches.test.expire-after-write", "10s");
map.put("redisson.caches.test.expire-after-access", "3s");
ApplicationContext ac = ApplicationContext.run(map);
RedissonClient client = ac.getBean(RedissonClient.class);
assertThat(client).isNotNull();
RedissonSyncCache cache = ac.getBean(RedissonSyncCache.class, Qualifiers.byName("test"));
cache.put(1, 2);
Thread.sleep(3500);
assertThat(cache.get(1, Integer.class).isPresent()).isFalse();
cache.put(3, 4);
Thread.sleep(2000);
cache.get(3, Integer.class);
Thread.sleep(2000);
assertThat(cache.get(3, Integer.class).isPresent()).isTrue();
}
}
package org.redisson.micronaut;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.session.Session;
import io.micronaut.session.event.AbstractSessionEvent;
import io.micronaut.session.event.SessionCreatedEvent;
import io.micronaut.session.event.SessionDeletedEvent;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.redisson.api.RedissonClient;
import org.redisson.micronaut.session.RedissonSession;
import org.redisson.micronaut.session.RedissonSessionStore;
import javax.inject.Singleton;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static org.assertj.core.api.Assertions.*;
/**
* @author Nikita Koksharov
*/
public class RedissonSessionTest {
public static class MyObject implements Serializable {
private String name;
public MyObject() {
}
public MyObject(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
@Singleton
public static class AppListener implements ApplicationEventListener<AbstractSessionEvent> {
List<AbstractSessionEvent> events = new ArrayList<>();
@Override
public void onApplicationEvent(AbstractSessionEvent event) {
events.add(event);
}
public List<AbstractSessionEvent> getEvents() {
return events;
}
}
@Test
public void testWriteBehind() throws ExecutionException, InterruptedException {
Map<String, Object> map = new HashMap<>();
map.put("redisson.threads", "10");
map.put("micronaut.session.http.redisson.enabled", "true");
map.put("micronaut.session.http.redisson.updateMode", "WRITE_BEHIND");
map.put("redisson.singleServerConfig.address", "redis://127.0.0.1:6379");
ApplicationContext ac = ApplicationContext.run(map);
RedissonClient rc = ac.getBean(RedissonClient.class);
rc.getKeys().flushall();
RedissonSessionStore sessionStore = ac.getBean(RedissonSessionStore.class);
RedissonSession session = sessionStore.newSession();
session.put("key1", "oleg");
session.put("key2", new MyObject("myname"));
session.setMaxInactiveInterval(Duration.ofSeconds(30));
RedissonSession saved = sessionStore.save(session).get();
saved.remove("key2");
saved.put("key1", "alba");
RedissonSession s = sessionStore.findSession(saved.getId()).get().get();
assertThat(s.get("key1").get()).isEqualTo("alba");
assertThat(s.contains("key2")).isFalse();
ac.stop();
}
@Test
public void testSessionExpiration() throws ExecutionException, InterruptedException {
Map<String, Object> map = new HashMap<>();
map.put("redisson.threads", "10");
map.put("micronaut.session.http.redisson.enabled", "true");
map.put("redisson.singleServerConfig.address", "redis://127.0.0.1:6379");
ApplicationContext ac = ApplicationContext.run(map);
RedissonClient rc = ac.getBean(RedissonClient.class);
rc.getKeys().flushall();
RedissonSessionStore sessionStore = ac.getBean(RedissonSessionStore.class);
RedissonSession session = sessionStore.newSession();
session.put("username", "oleg");
session.put("foo", new MyObject("myname"));
session.setMaxInactiveInterval(Duration.ofSeconds(30));
RedissonSession saved = sessionStore.save(session).get();
testData(saved);
Thread.sleep(30500);
Optional<RedissonSession> noSession = sessionStore.findSession(saved.getId()).get();
assertThat(noSession).isEmpty();
Thread.sleep(10000);
assertThat(rc.getKeys().count()).isZero();
ac.stop();
}
@Test
public void testSessionCreate() throws ExecutionException, InterruptedException {
Map<String, Object> map = new HashMap<>();
map.put("redisson.threads", "10");
map.put("micronaut.session.http.redisson.enabled", "true");
map.put("redisson.singleServerConfig.address", "redis://127.0.0.1:6379");
ApplicationContext ac = ApplicationContext.run(map);
RedissonClient rc = ac.getBean(RedissonClient.class);
AppListener listener = ac.getBean(AppListener.class);
rc.getKeys().flushall();
RedissonSessionStore sessionStore = ac.getBean(RedissonSessionStore.class);
RedissonSession session = sessionStore.newSession();
session.put("username", "oleg");
session.put("foo", new MyObject("myname"));
RedissonSession saved = sessionStore.save(session).get();
testData(saved);
assertThat(listener.getEvents()).hasSize(1);
assertThat(listener.getEvents().get(0)).isInstanceOf(SessionCreatedEvent.class);
listener.getEvents().clear();
RedissonSession loaded = sessionStore.findSession(saved.getId()).get().get();
testData(loaded);
loaded.put("key", "value");
loaded.remove("username");
loaded.setLastAccessedTime(Instant.now());
loaded.setMaxInactiveInterval(Duration.ofMinutes(1));
sessionStore.save(loaded).get();
assertThat(listener.getEvents()).isEmpty();
loaded = sessionStore.findSession(saved.getId()).get().get();
assertThat(listener.getEvents()).isEmpty();
assertThat(loaded.contains("username")).isFalse();
assertThat(((MyObject) loaded.get("foo").get()).getName()).isEqualTo("myname");
assertThat(loaded.get("key").get()).isEqualTo("value");
assertThat(loaded.isExpired()).isFalse();
assertThat(loaded.getCreationTime().getEpochSecond()).isEqualTo(saved.getCreationTime().getEpochSecond());
assertThat(loaded.getMaxInactiveInterval()).isEqualTo(Duration.ofMinutes(1));
assertThat(loaded.getId()).isEqualTo(saved.getId());
Boolean deleted = sessionStore.deleteSession(saved.getId()).get();
assertThat(deleted).isTrue();
Thread.sleep(1500);
assertThat(listener.getEvents()).hasSize(1);
assertThat(listener.getEvents().get(0)).isInstanceOf(SessionDeletedEvent.class);
Optional<RedissonSession> noSession = sessionStore.findSession(saved.getId()).get();
assertThat(noSession).isEmpty();
Thread.sleep(10000);
assertThat(rc.getKeys().count()).isZero();
ac.stop();
}
private void testData(RedissonSession saved) {
assertThat(saved.get("username").get()).isEqualTo("oleg");
assertThat(((MyObject) saved.get("foo").get()).getName()).isEqualTo("myname");
assertThat(saved.isExpired()).isFalse();
assertThat(saved.getCreationTime()).isNotNull();
assertThat(saved.getMaxInactiveInterval()).isNotNull();
assertThat(saved.getId()).isNotNull();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册