提交 33fdb6a3 编写于 作者: N Nikita Koksharov

Feature - addListener and removeListener methods added to RObject,...

Feature - addListener and removeListener methods added to RObject, RObjectAsync, RObjectReactive, RObjectRx objects. Allows to add and remove listeners for Deleted and Expired keyspace events published by Redis. #220
上级 224a900a
......@@ -1962,6 +1962,8 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
@Override
public void removeListener(int listenerId) {
super.removeListener(listenerId);
RTopic removedTopic = redisson.getTopic(getRemovedChannelName());
removedTopic.removeListener(listenerId);
......
......@@ -26,14 +26,22 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.redisson.api.DeletedObjectListener;
import org.redisson.api.ExpiredObjectListener;
import org.redisson.api.ObjectListener;
import org.redisson.api.RFuture;
import org.redisson.api.RObject;
import org.redisson.api.RPatternTopic;
import org.redisson.api.listener.PatternMessageListener;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.CountableListener;
import org.redisson.misc.Hash;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import io.netty.buffer.ByteBuf;
......@@ -383,4 +391,80 @@ public abstract class RedissonObject implements RObject {
return restoreAndReplaceAsync(state, 0, null);
}
@Override
public int addListener(ObjectListener listener) {
if (listener instanceof ExpiredObjectListener) {
RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:expired");
return topic.addListener(String.class, new PatternMessageListener<String>() {
@Override
public void onMessage(CharSequence pattern, CharSequence channel, String msg) {
if (msg.equals(getName())) {
((ExpiredObjectListener) listener).onExpired(msg);
}
}
});
}
if (listener instanceof DeletedObjectListener) {
RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:del");
return topic.addListener(String.class, new PatternMessageListener<String>() {
@Override
public void onMessage(CharSequence pattern, CharSequence channel, String msg) {
if (msg.equals(getName())) {
((DeletedObjectListener) listener).onDeleted(msg);
}
}
});
}
throw new IllegalArgumentException();
};
@Override
public RFuture<Integer> addListenerAsync(ObjectListener listener) {
if (listener instanceof ExpiredObjectListener) {
RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:expired");
return topic.addListenerAsync(String.class, new PatternMessageListener<String>() {
@Override
public void onMessage(CharSequence pattern, CharSequence channel, String msg) {
if (msg.equals(getName())) {
((ExpiredObjectListener) listener).onExpired(msg);
}
}
});
}
if (listener instanceof DeletedObjectListener) {
RPatternTopic topic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:del");
return topic.addListenerAsync(String.class, new PatternMessageListener<String>() {
@Override
public void onMessage(CharSequence pattern, CharSequence channel, String msg) {
if (msg.equals(getName())) {
((DeletedObjectListener) listener).onDeleted(msg);
}
}
});
}
throw new IllegalArgumentException();
}
@Override
public void removeListener(int listenerId) {
RPatternTopic expiredTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:expired");
expiredTopic.removeListener(listenerId);
RPatternTopic deletedTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:del");
deletedTopic.removeListener(listenerId);
}
@Override
public RFuture<Void> removeListenerAsync(int listenerId) {
RPromise<Void> result = new RedissonPromise<>();
CountableListener<Void> listener = new CountableListener<Void>(result, null, 2);
RPatternTopic expiredTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:expired");
expiredTopic.removeListenerAsync(listenerId).onComplete(listener);
RPatternTopic deletedTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:expired");
deletedTopic.removeListenerAsync(listenerId).onComplete(listener);
return result;
}
}
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
/**
* Redisson Object Event listener for <b>deleted</b> event published by Redis.
* <p>
* Redis notify-keyspace-events setting should contain Eg letters
*
* @author Nikita Koksharov
*
*/
public interface DeletedObjectListener extends ObjectListener {
/**
* Invoked on deleted event
*
* @param name - name of object
*/
void onDeleted(String name);
}
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
/**
* Redisson Object Event listener for <b>expired</b> event published by Redis.
* <p>
* Redis notify-keyspace-events setting should contain Ex letters
*
* @author Nikita Koksharov
*
*/
public interface ExpiredObjectListener extends ObjectListener {
/**
* Invoked on expired event
*
* @param name - name of object
*/
void onExpired(String name);
}
/**
* Copyright (c) 2013-2019 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api;
import java.util.EventListener;
/**
* Redisson Object Event listener for Expired or Deleted event.
*
* @author Nikita Koksharov
*
*/
public interface ObjectListener extends EventListener {
}
......@@ -161,4 +161,23 @@ public interface RObject extends RObjectAsync {
* @return Codec of object
*/
Codec getCodec();
/**
* Adds object event listener
*
* @see org.redisson.api.ExpiredObjectListener
* @see org.redisson.api.DeletedObjectListener
*
* @param listener - object event listener
* @return listener id
*/
int addListener(ObjectListener listener);
/**
* Removes object event listener
*
* @param listenerId - listener id
*/
void removeListener(int listenerId);
}
......@@ -156,4 +156,22 @@ public interface RObjectAsync {
*/
RFuture<Boolean> isExistsAsync();
/**
* Adds object event listener
*
* @see org.redisson.api.ExpiredObjectListener
* @see org.redisson.api.DeletedObjectListener
*
* @param listener - object event listener
* @return listener id
*/
RFuture<Integer> addListenerAsync(ObjectListener listener);
/**
* Removes object event listener
*
* @param listenerId - listener id
*/
RFuture<Void> removeListenerAsync(int listenerId);
}
......@@ -156,4 +156,22 @@ public interface RObjectReactive {
*/
Mono<Boolean> isExists();
/**
* Adds object event listener
*
* @see org.redisson.api.ExpiredObjectListener
* @see org.redisson.api.DeletedObjectListener
*
* @param listener - object event listener
* @return listener id
*/
Mono<Integer> addListener(ObjectListener listener);
/**
* Removes object event listener
*
* @param listenerId - listener id
*/
Mono<Void> removeListener(int listenerId);
}
......@@ -157,4 +157,22 @@ public interface RObjectRx {
*/
Single<Boolean> isExists();
/**
* Adds object event listener
*
* @see org.redisson.api.ExpiredObjectListener
* @see org.redisson.api.DeletedObjectListener
*
* @param listener - object event listener
* @return listener id
*/
Single<Integer> addListener(ObjectListener listener);
/**
* Removes object event listener
*
* @param listenerId - listener id
*/
Completable removeListener(int listenerId);
}
......@@ -5,19 +5,86 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.redisson.RedisRunner.FailedToStartRedisException;
import org.redisson.RedisRunner.KEYSPACE_EVENTS_OPTIONS;
import org.redisson.RedisRunner.RedisProcess;
import org.redisson.api.DeletedObjectListener;
import org.redisson.api.ExpiredObjectListener;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
public class RedissonBucketTest extends BaseTest {
@Test
public void testDeletedListener() throws FailedToStartRedisException, IOException, InterruptedException {
RedisProcess instance = new RedisRunner()
.nosave()
.port(6379)
.randomDir()
.notifyKeyspaceEvents(
KEYSPACE_EVENTS_OPTIONS.E,
KEYSPACE_EVENTS_OPTIONS.g)
.run();
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
RBucket<Integer> al = redisson.getBucket("test");
al.set(1);
CountDownLatch latch = new CountDownLatch(1);
al.addListener(new DeletedObjectListener() {
@Override
public void onDeleted(String name) {
latch.countDown();
}
});
al.delete();
assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue();
redisson.shutdown();
instance.stop();
}
@Test
public void testExpiredListener() throws FailedToStartRedisException, IOException, InterruptedException {
RedisProcess instance = new RedisRunner()
.nosave()
.port(6379)
.randomDir()
.notifyKeyspaceEvents(
KEYSPACE_EVENTS_OPTIONS.E,
KEYSPACE_EVENTS_OPTIONS.x)
.run();
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
RBucket<Integer> al = redisson.getBucket("test");
al.set(1, 3, TimeUnit.SECONDS);
CountDownLatch latch = new CountDownLatch(1);
al.addListener(new ExpiredObjectListener() {
@Override
public void onExpired(String name) {
latch.countDown();
}
});
assertThat(latch.await(4, TimeUnit.SECONDS)).isTrue();
redisson.shutdown();
instance.stop();
}
@Test
public void testSizeInMemory() {
Assume.assumeTrue(RedisRunner.getDefaultRedisServerInstance().getRedisVersion().compareTo("4.0.0") > 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册