提交 1dcce68d 编写于 作者: N Nikita Koksharov

Fixed - RPatternTopic on keyspace/keyevent notification subscribes only to...

Fixed - RPatternTopic on keyspace/keyevent notification subscribes only to single master node in Redis cluster #2237.
上级 281a3363
......@@ -18,7 +18,6 @@ package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
......@@ -30,6 +29,7 @@ import org.springframework.data.redis.connection.util.AbstractSubscription;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
......@@ -50,7 +50,7 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doSubscribe(byte[]... channels) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
List<RFuture<?>> list = new ArrayList<>();
for (byte[] channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
......@@ -80,9 +80,9 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doPsubscribe(byte[]... patterns) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
List<RFuture<?>> list = new ArrayList<>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
RFuture<Collection<PubSubConnectionEntry>> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) {
......@@ -117,8 +117,8 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doClose() {
doUnsubscribe(false, (byte[][]) getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, (byte[][]) getPatterns().toArray(new byte[getPatterns().size()][]));
doUnsubscribe(false, getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, getPatterns().toArray(new byte[getPatterns().size()][]));
}
}
......@@ -18,7 +18,6 @@ package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
......@@ -30,6 +29,7 @@ import org.springframework.data.redis.connection.util.AbstractSubscription;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
......@@ -50,7 +50,7 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doSubscribe(byte[]... channels) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
List<RFuture<?>> list = new ArrayList<>();
for (byte[] channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
......@@ -80,9 +80,9 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doPsubscribe(byte[]... patterns) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
List<RFuture<?>> list = new ArrayList<>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
RFuture<Collection<PubSubConnectionEntry>> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) {
......@@ -117,8 +117,8 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doClose() {
doUnsubscribe(false, (byte[][]) getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, (byte[][]) getPatterns().toArray(new byte[getPatterns().size()][]));
doUnsubscribe(false, getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, getPatterns().toArray(new byte[getPatterns().size()][]));
}
}
......@@ -18,7 +18,6 @@ package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
......@@ -30,6 +29,7 @@ import org.springframework.data.redis.connection.util.AbstractSubscription;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
......@@ -50,7 +50,7 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doSubscribe(byte[]... channels) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
List<RFuture<?>> list = new ArrayList<>();
for (byte[] channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
......@@ -80,9 +80,9 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doPsubscribe(byte[]... patterns) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
List<RFuture<?>> list = new ArrayList<>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
RFuture<Collection<PubSubConnectionEntry>> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) {
......@@ -117,8 +117,8 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doClose() {
doUnsubscribe(false, (byte[][]) getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, (byte[][]) getPatterns().toArray(new byte[getPatterns().size()][]));
doUnsubscribe(false, getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, getPatterns().toArray(new byte[getPatterns().size()][]));
}
}
......@@ -18,7 +18,6 @@ package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
......@@ -30,6 +29,7 @@ import org.springframework.data.redis.connection.util.AbstractSubscription;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
......@@ -50,7 +50,7 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doSubscribe(byte[]... channels) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
List<RFuture<?>> list = new ArrayList<>();
for (byte[] channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
......@@ -74,15 +74,15 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doUnsubscribe(boolean all, byte[]... channels) {
for (byte[] channel : channels) {
subscribeService.unsubscribe(new ChannelName(channel), PubSubType.UNSUBSCRIBE);
subscribeService.unsubscribe(new ChannelName(channel), PubSubType.UNSUBSCRIBE);
}
}
@Override
protected void doPsubscribe(byte[]... patterns) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
List<RFuture<?>> list = new ArrayList<>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
RFuture<Collection<PubSubConnectionEntry>> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) {
......@@ -117,8 +117,8 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doClose() {
doUnsubscribe(false, (byte[][]) getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, (byte[][]) getPatterns().toArray(new byte[getPatterns().size()][]));
doUnsubscribe(false, getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, getPatterns().toArray(new byte[getPatterns().size()][]));
}
}
......@@ -32,6 +32,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
......@@ -84,7 +85,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
private final Map<ChannelName, PubSubConnectionEntry> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, PubSubConnectionEntry> patterns = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> patterns = new ConcurrentHashMap<>();
private final ListenableCounter monosListener = new ListenableCounter();
......@@ -129,7 +130,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
CountableListener<Void> listener = new CountableListener<>(result, null, patterns.length);
for (ByteBuffer channel : patterns) {
ChannelName cn = toChannelName(channel);
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(cn, ByteArrayCodec.INSTANCE);
RFuture<Collection<PubSubConnectionEntry>> f = subscribeService.psubscribe(cn, ByteArrayCodec.INSTANCE);
f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(cn, res));
f.onComplete(listener);
}
......@@ -187,10 +188,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
RFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE);
f.onComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.patterns) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.patterns.get(cn);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.patterns.remove(cn);
}
Collection<PubSubConnectionEntry> entries = RedissonReactiveSubscription.this.patterns.get(cn);
entries.stream()
.filter(en -> en.hasListeners(cn))
.forEach(ee -> RedissonReactiveSubscription.this.patterns.remove(cn));
}
});
f.onComplete(listener);
......@@ -218,7 +219,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
return flux.get();
}
Flux<Message<ByteBuffer, ByteBuffer>> f = Flux.<Message<ByteBuffer, ByteBuffer>>create(emitter -> {
Flux<Message<ByteBuffer, ByteBuffer>> f = Flux.create(emitter -> {
emitter.onRequest(n -> {
monosListener.addListener(() -> {
......@@ -259,16 +260,20 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
}
for (Entry<ChannelName, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.removeListener(entry.getKey(), listener);
}
}
};
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
}
for (Entry<ChannelName, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.addListener(entry.getKey(), listener);
}
}
emitter.onDispose(disposable);
......
......@@ -18,7 +18,6 @@ package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
......@@ -30,6 +29,7 @@ import org.springframework.data.redis.connection.util.AbstractSubscription;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
......@@ -50,7 +50,7 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doSubscribe(byte[]... channels) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
List<RFuture<?>> list = new ArrayList<>();
for (byte[] channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
......@@ -80,9 +80,9 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doPsubscribe(byte[]... patterns) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
List<RFuture<?>> list = new ArrayList<>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
RFuture<Collection<PubSubConnectionEntry>> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) {
......@@ -117,8 +117,8 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doClose() {
doUnsubscribe(false, (byte[][]) getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, (byte[][]) getPatterns().toArray(new byte[getPatterns().size()][]));
doUnsubscribe(false, getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, getPatterns().toArray(new byte[getPatterns().size()][]));
}
}
......@@ -32,6 +32,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
......@@ -84,7 +85,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
private final Map<ChannelName, PubSubConnectionEntry> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, PubSubConnectionEntry> patterns = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> patterns = new ConcurrentHashMap<>();
private final ListenableCounter monosListener = new ListenableCounter();
......@@ -129,7 +130,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
CountableListener<Void> listener = new CountableListener<>(result, null, patterns.length);
for (ByteBuffer channel : patterns) {
ChannelName cn = toChannelName(channel);
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(cn, ByteArrayCodec.INSTANCE);
RFuture<Collection<PubSubConnectionEntry>> f = subscribeService.psubscribe(cn, ByteArrayCodec.INSTANCE);
f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(cn, res));
f.onComplete(listener);
}
......@@ -187,10 +188,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
RFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE);
f.onComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.patterns) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.patterns.get(cn);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.patterns.remove(cn);
}
Collection<PubSubConnectionEntry> entries = RedissonReactiveSubscription.this.patterns.get(cn);
entries.stream()
.filter(en -> en.hasListeners(cn))
.forEach(ee -> RedissonReactiveSubscription.this.patterns.remove(cn));
}
});
f.onComplete(listener);
......@@ -218,7 +219,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
return flux.get();
}
Flux<Message<ByteBuffer, ByteBuffer>> f = Flux.<Message<ByteBuffer, ByteBuffer>>create(emitter -> {
Flux<Message<ByteBuffer, ByteBuffer>> f = Flux.create(emitter -> {
emitter.onRequest(n -> {
monosListener.addListener(() -> {
......@@ -259,16 +260,20 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
}
for (Entry<ChannelName, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.removeListener(entry.getKey(), listener);
}
}
};
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
}
for (Entry<ChannelName, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.addListener(entry.getKey(), listener);
}
}
emitter.onDispose(disposable);
......
......@@ -18,7 +18,6 @@ package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
......@@ -30,6 +29,7 @@ import org.springframework.data.redis.connection.util.AbstractSubscription;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
......@@ -50,7 +50,7 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doSubscribe(byte[]... channels) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
List<RFuture<?>> list = new ArrayList<>();
for (byte[] channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
......@@ -80,9 +80,9 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doPsubscribe(byte[]... patterns) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
List<RFuture<?>> list = new ArrayList<>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
RFuture<Collection<PubSubConnectionEntry>> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) {
......@@ -117,8 +117,8 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doClose() {
doUnsubscribe(false, (byte[][]) getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, (byte[][]) getPatterns().toArray(new byte[getPatterns().size()][]));
doUnsubscribe(false, getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, getPatterns().toArray(new byte[getPatterns().size()][]));
}
}
......@@ -32,6 +32,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
......@@ -84,7 +85,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
private final Map<ChannelName, PubSubConnectionEntry> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, PubSubConnectionEntry> patterns = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> patterns = new ConcurrentHashMap<>();
private final ListenableCounter monosListener = new ListenableCounter();
......@@ -129,7 +130,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
CountableListener<Void> listener = new CountableListener<>(result, null, patterns.length);
for (ByteBuffer channel : patterns) {
ChannelName cn = toChannelName(channel);
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(cn, ByteArrayCodec.INSTANCE);
RFuture<Collection<PubSubConnectionEntry>> f = subscribeService.psubscribe(cn, ByteArrayCodec.INSTANCE);
f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(cn, res));
f.onComplete(listener);
}
......@@ -187,10 +188,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
RFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE);
f.onComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.patterns) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.patterns.get(cn);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.patterns.remove(cn);
}
Collection<PubSubConnectionEntry> entries = RedissonReactiveSubscription.this.patterns.get(cn);
entries.stream()
.filter(en -> en.hasListeners(cn))
.forEach(ee -> RedissonReactiveSubscription.this.patterns.remove(cn));
}
});
f.onComplete(listener);
......@@ -218,7 +219,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
return flux.get();
}
Flux<Message<ByteBuffer, ByteBuffer>> f = Flux.<Message<ByteBuffer, ByteBuffer>>create(emitter -> {
Flux<Message<ByteBuffer, ByteBuffer>> f = Flux.create(emitter -> {
emitter.onRequest(n -> {
monosListener.addListener(() -> {
......@@ -259,16 +260,20 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
}
for (Entry<ChannelName, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.removeListener(entry.getKey(), listener);
}
}
};
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
}
for (Entry<ChannelName, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.addListener(entry.getKey(), listener);
}
}
emitter.onDispose(disposable);
......
......@@ -18,7 +18,6 @@ package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
......@@ -30,6 +29,7 @@ import org.springframework.data.redis.connection.util.AbstractSubscription;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
......@@ -50,7 +50,7 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doSubscribe(byte[]... channels) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
List<RFuture<?>> list = new ArrayList<>();
for (byte[] channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
......@@ -80,9 +80,9 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doPsubscribe(byte[]... patterns) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
List<RFuture<?>> list = new ArrayList<>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
RFuture<Collection<PubSubConnectionEntry>> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) {
......@@ -117,8 +117,8 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doClose() {
doUnsubscribe(false, (byte[][]) getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, (byte[][]) getPatterns().toArray(new byte[getPatterns().size()][]));
doUnsubscribe(false, getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, getPatterns().toArray(new byte[getPatterns().size()][]));
}
}
......@@ -32,6 +32,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
......@@ -84,7 +85,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
}
private final Map<ChannelName, PubSubConnectionEntry> channels = new ConcurrentHashMap<>();
private final Map<ChannelName, PubSubConnectionEntry> patterns = new ConcurrentHashMap<>();
private final Map<ChannelName, Collection<PubSubConnectionEntry>> patterns = new ConcurrentHashMap<>();
private final ListenableCounter monosListener = new ListenableCounter();
......@@ -129,7 +130,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
CountableListener<Void> listener = new CountableListener<>(result, null, patterns.length);
for (ByteBuffer channel : patterns) {
ChannelName cn = toChannelName(channel);
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(cn, ByteArrayCodec.INSTANCE);
RFuture<Collection<PubSubConnectionEntry>> f = subscribeService.psubscribe(cn, ByteArrayCodec.INSTANCE);
f.onComplete((res, e) -> RedissonReactiveSubscription.this.patterns.put(cn, res));
f.onComplete(listener);
}
......@@ -187,10 +188,10 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
RFuture<Codec> f = subscribeService.unsubscribe(cn, PubSubType.PUNSUBSCRIBE);
f.onComplete((res, e) -> {
synchronized (RedissonReactiveSubscription.this.patterns) {
PubSubConnectionEntry entry = RedissonReactiveSubscription.this.patterns.get(cn);
if (!entry.hasListeners(cn)) {
RedissonReactiveSubscription.this.patterns.remove(cn);
}
Collection<PubSubConnectionEntry> entries = RedissonReactiveSubscription.this.patterns.get(cn);
entries.stream()
.filter(en -> en.hasListeners(cn))
.forEach(ee -> RedissonReactiveSubscription.this.patterns.remove(cn));
}
});
f.onComplete(listener);
......@@ -218,7 +219,7 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
return flux.get();
}
Flux<Message<ByteBuffer, ByteBuffer>> f = Flux.<Message<ByteBuffer, ByteBuffer>>create(emitter -> {
Flux<Message<ByteBuffer, ByteBuffer>> f = Flux.create(emitter -> {
emitter.onRequest(n -> {
monosListener.addListener(() -> {
......@@ -259,16 +260,20 @@ public class RedissonReactiveSubscription implements ReactiveSubscription {
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
}
for (Entry<ChannelName, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().removeListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.removeListener(entry.getKey(), listener);
}
}
};
for (Entry<ChannelName, PubSubConnectionEntry> entry : channels.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
}
for (Entry<ChannelName, PubSubConnectionEntry> entry : patterns.entrySet()) {
entry.getValue().addListener(entry.getKey(), listener);
for (Entry<ChannelName, Collection<PubSubConnectionEntry>> entry : patterns.entrySet()) {
for (PubSubConnectionEntry pubSubConnectionEntry : entry.getValue()) {
pubSubConnectionEntry.addListener(entry.getKey(), listener);
}
}
emitter.onDispose(disposable);
......
......@@ -18,7 +18,6 @@ package org.redisson.spring.data.connection;
import org.redisson.api.RFuture;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.ChannelName;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.connection.ConnectionManager;
......@@ -30,6 +29,7 @@ import org.springframework.data.redis.connection.util.AbstractSubscription;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
......@@ -50,7 +50,7 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doSubscribe(byte[]... channels) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
List<RFuture<?>> list = new ArrayList<>();
for (byte[] channel : channels) {
RFuture<PubSubConnectionEntry> f = subscribeService.subscribe(ByteArrayCodec.INSTANCE, new ChannelName(channel), new BaseRedisPubSubListener() {
@Override
......@@ -80,9 +80,9 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doPsubscribe(byte[]... patterns) {
List<RFuture<?>> list = new ArrayList<RFuture<?>>();
List<RFuture<?>> list = new ArrayList<>();
for (byte[] channel : patterns) {
RFuture<PubSubConnectionEntry> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
RFuture<Collection<PubSubConnectionEntry>> f = subscribeService.psubscribe(new ChannelName(channel), ByteArrayCodec.INSTANCE, new BaseRedisPubSubListener() {
@Override
public void onPatternMessage(CharSequence pattern, CharSequence ch, Object message) {
if (!Arrays.equals(((ChannelName) pattern).getName(), channel)) {
......@@ -117,8 +117,8 @@ public class RedissonSubscription extends AbstractSubscription {
@Override
protected void doClose() {
doUnsubscribe(false, (byte[][]) getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, (byte[][]) getPatterns().toArray(new byte[getPatterns().size()][]));
doUnsubscribe(false, getChannels().toArray(new byte[getChannels().size()][]));
doPUnsubscribe(false, getPatterns().toArray(new byte[getPatterns().size()][]));
}
}
......@@ -33,6 +33,11 @@ public class PubSubPatternStatusListener implements RedisPubSubListener<Object>
return name;
}
public PubSubPatternStatusListener(PubSubPatternStatusListener l) {
this.listener = l.listener;
this.name = l.name;
}
public PubSubPatternStatusListener(PatternStatusListener listener, String name) {
super();
this.listener = listener;
......
......@@ -32,6 +32,7 @@ import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.PubSubConnectionEntry;
import org.redisson.pubsub.PublishSubscribeService;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
......@@ -73,7 +74,7 @@ public class RedissonPatternTopic implements RPatternTopic {
}
private int addListener(RedisPubSubListener<?> pubSubListener) {
RFuture<PubSubConnectionEntry> future = subscribeService.psubscribe(channelName, codec, pubSubListener);
RFuture<Collection<PubSubConnectionEntry>> future = subscribeService.psubscribe(channelName, codec, pubSubListener);
commandExecutor.syncSubscription(future);
return System.identityHashCode(pubSubListener);
}
......@@ -91,7 +92,7 @@ public class RedissonPatternTopic implements RPatternTopic {
}
private RFuture<Integer> addListenerAsync(RedisPubSubListener<?> pubSubListener) {
RFuture<PubSubConnectionEntry> future = subscribeService.psubscribe(channelName, codec, pubSubListener);
RFuture<Collection<PubSubConnectionEntry>> future = subscribeService.psubscribe(channelName, codec, pubSubListener);
RPromise<Integer> result = new RedissonPromise<Integer>();
future.onComplete((res, e) -> {
if (e != null) {
......@@ -134,10 +135,9 @@ public class RedissonPatternTopic implements RPatternTopic {
}
if (entry.hasListeners(channelName)) {
subscribeService.unsubscribe(PubSubType.PUNSUBSCRIBE, channelName, semaphore).syncUninterruptibly();
} else {
semaphore.release();
subscribeService.unsubscribe(PubSubType.PUNSUBSCRIBE, channelName).syncUninterruptibly();
}
semaphore.release();
}
@Override
......
......@@ -145,10 +145,9 @@ public class RedissonTopic implements RTopic {
}
if (entry.hasListeners(channelName)) {
subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName, semaphore).syncUninterruptibly();
} else {
semaphore.release();
subscribeService.unsubscribe(PubSubType.UNSUBSCRIBE, channelName).syncUninterruptibly();
}
semaphore.release();
}
protected void acquire(AsyncSemaphore semaphore) {
......
......@@ -53,7 +53,10 @@ abstract class PublishSubscribe<E extends PubSubEntry<E>> {
if (!removed) {
throw new IllegalStateException();
}
service.unsubscribe(PubSubType.UNSUBSCRIBE, new ChannelName(channelName), semaphore);
service.unsubscribe(PubSubType.UNSUBSCRIBE, new ChannelName(channelName))
.onComplete((r, e) -> {
semaphore.release();
});
} else {
semaphore.release();
}
......
package org.redisson;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
......@@ -16,6 +18,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.redisson.RedisRunner.RedisProcess;
......@@ -25,7 +28,9 @@ import org.redisson.api.RedissonClient;
import org.redisson.api.listener.BasePatternStatusListener;
import org.redisson.api.listener.PatternMessageListener;
import org.redisson.api.listener.PatternStatusListener;
import org.redisson.client.codec.StringCodec;
import org.redisson.config.Config;
import org.redisson.connection.balancer.RandomLoadBalancer;
public class RedissonTopicPatternTest extends BaseTest {
......@@ -66,7 +71,131 @@ public class RedissonTopicPatternTest extends BaseTest {
return "Message{" + "name='" + name + '\'' + '}';
}
}
@Test
public void testCluster() throws IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave().notifyKeyspaceEvents(
RedisRunner.KEYSPACE_EVENTS_OPTIONS.E,
RedisRunner.KEYSPACE_EVENTS_OPTIONS.g);
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Thread.sleep(3000);
Config config = new Config();
config.useClusterServers()
.setPingConnectionInterval(0)
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
AtomicInteger subscribeCounter = new AtomicInteger();
RPatternTopic topic = redisson.getPatternTopic("__keyevent@*", StringCodec.INSTANCE);
topic.addListener(new PatternStatusListener() {
@Override
public void onPSubscribe(String pattern) {
subscribeCounter.incrementAndGet();
}
@Override
public void onPUnsubscribe(String pattern) {
System.out.println("onPUnsubscribe: " + pattern);
}
});
AtomicInteger counter = new AtomicInteger();
PatternMessageListener<String> listener = (pattern, channel, msg) -> {
System.out.println("mes " + channel + " counter " + counter.get());
counter.incrementAndGet();
};
topic.addListener(String.class, listener);
for (int i = 0; i < 10; i++) {
redisson.getBucket("" + i).set(i);
redisson.getBucket("" + i).delete();
Thread.sleep(7);
}
Awaitility.await().atMost(Duration.ofSeconds(2)).until(() -> counter.get() > 9);
assertThat(subscribeCounter.get()).isEqualTo(1);
redisson.shutdown();
process.shutdown();
}
@Test
public void testNonEventMessagesInCluster() throws IOException, InterruptedException {
RedisRunner master1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner master3 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave1 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave2 = new RedisRunner().randomPort().randomDir().nosave();
RedisRunner slave3 = new RedisRunner().randomPort().randomDir().nosave();
ClusterRunner clusterRunner = new ClusterRunner()
.addNode(master1, slave1)
.addNode(master2, slave2)
.addNode(master3, slave3);
ClusterRunner.ClusterProcesses process = clusterRunner.run();
Config config = new Config();
config.useClusterServers()
.setLoadBalancer(new RandomLoadBalancer())
.addNodeAddress(process.getNodes().stream().findAny().get().getRedisServerAddressAndPort());
RedissonClient redisson = Redisson.create(config);
AtomicInteger subscribeCounter = new AtomicInteger();
RPatternTopic topic = redisson.getPatternTopic("my*", StringCodec.INSTANCE);
topic.addListener(new PatternStatusListener() {
@Override
public void onPSubscribe(String pattern) {
subscribeCounter.incrementAndGet();
}
@Override
public void onPUnsubscribe(String pattern) {
System.out.println("onPUnsubscribe: " + pattern);
}
});
AtomicInteger counter = new AtomicInteger();
PatternMessageListener<String> listener = (pattern, channel, msg) -> {
counter.incrementAndGet();
};
topic.addListener(String.class, listener);
for (int i = 0; i < 100; i++) {
redisson.getTopic("my" + i).publish(123);
}
Awaitility.await().atMost(Duration.ofSeconds(2)).until(() -> counter.get() == 100);
assertThat(subscribeCounter.get()).isEqualTo(1);
redisson.shutdown();
process.shutdown();
}
@Test
public void testMultiType() throws InterruptedException {
RPatternTopic topic1 = redisson.getPatternTopic("topic1.*");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册