提交 57266be2 编写于 作者: N Nikita Koksharov

Feature - add ScoredSortedSetAddListener to RScoredSortedSet object #3376

上级 eb6a9126
......@@ -15,10 +15,8 @@
*/
package org.redisson;
import org.redisson.api.RFuture;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RedissonClient;
import org.redisson.api.SortOrder;
import org.redisson.api.*;
import org.redisson.api.listener.ScoredSortedSetAddListener;
import org.redisson.api.mapreduce.RCollectionMapReduce;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec;
......@@ -33,6 +31,8 @@ import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.iterator.RedissonBaseIterator;
import org.redisson.mapreduce.RedissonCollectionMapReduce;
import org.redisson.misc.CountableListener;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import java.math.BigDecimal;
......@@ -1426,4 +1426,40 @@ public class RedissonScoredSortedSet<V> extends RedissonExpirable implements RSc
public Stream<V> stream(String pattern, int count) {
return toStream(iterator(pattern, count));
}
@Override
public int addListener(ObjectListener listener) {
if (listener instanceof ScoredSortedSetAddListener) {
return addListener("__keyevent@*:zadd", (ScoredSortedSetAddListener) listener, ScoredSortedSetAddListener::onAdd);
}
return super.addListener(listener);
};
@Override
public RFuture<Integer> addListenerAsync(ObjectListener listener) {
if (listener instanceof ScoredSortedSetAddListener) {
return addListenerAsync("__keyevent@*:zadd", (ScoredSortedSetAddListener) listener, ScoredSortedSetAddListener::onAdd);
}
return super.addListenerAsync(listener);
}
@Override
public void removeListener(int listenerId) {
RPatternTopic expiredTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:zadd");
expiredTopic.removeListener(listenerId);
super.removeListener(listenerId);
}
@Override
public RFuture<Void> removeListenerAsync(int listenerId) {
RPromise<Void> result = new RedissonPromise<>();
CountableListener<Void> listener = new CountableListener<>(result, null, 3);
RPatternTopic setTopic = new RedissonPatternTopic(StringCodec.INSTANCE, commandExecutor, "__keyevent@*:zadd");
setTopic.removeListenerAsync(listenerId).onComplete(listener);
removeListenersAsync(listenerId, listener);
return result;
}
}
/**
* Copyright (c) 2013-2020 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.api.listener;
import org.redisson.api.ObjectListener;
/**
* Redisson Object Event listener for <b>add</b> event published by RScoredSortedSet object.
* <p>
* Redis notify-keyspace-events setting should contain Ez letters
*
* @author Nikita Koksharov
*
*/
public interface ScoredSortedSetAddListener extends ObjectListener {
/**
* Invoked when entry added to RScoredSortedSet object
*
* @param name - name of object
*/
void onAdd(String name);
}
......@@ -2,6 +2,7 @@ package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
......@@ -12,6 +13,7 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
......@@ -19,15 +21,13 @@ import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.redisson.api.RFuture;
import org.redisson.api.RLexSortedSet;
import org.redisson.api.RList;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RSortedSet;
import org.redisson.api.SortOrder;
import org.redisson.api.*;
import org.redisson.api.listener.ScoredSortedSetAddListener;
import org.redisson.api.listener.SetObjectListener;
import org.redisson.client.codec.IntegerCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.config.Config;
public class RedissonScoredSortedSetTest extends BaseTest {
......@@ -1358,6 +1358,38 @@ public class RedissonScoredSortedSetTest extends BaseTest {
assertThat(out.readAll()).isEmpty();
}
@Test
public void testAddListener() throws RedisRunner.FailedToStartRedisException, IOException, InterruptedException {
RedisRunner.RedisProcess instance = new RedisRunner()
.nosave()
.randomPort()
.randomDir()
.notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.z)
.run();
Config config = new Config();
config.useSingleServer().setAddress(instance.getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
RScoredSortedSet<Integer> ss = redisson.getScoredSortedSet("test");
CountDownLatch latch = new CountDownLatch(1);
ss.addListener(new ScoredSortedSetAddListener() {
@Override
public void onAdd(String name) {
latch.countDown();
}
});
ss.add(1, 1);
assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue();
redisson.shutdown();
instance.stop();
}
@Test
public void testIntersectionWithWeight() {
RScoredSortedSet<String> set1 = redisson.getScoredSortedSet("simple1");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册