提交 696a0005 编写于 作者: N Nikita Koksharov

Fixed - CROSSSLOT error when clearing a redis-spring-data cache #3153

上级 d54f31db
......@@ -27,9 +27,14 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
......@@ -38,7 +43,10 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterConnection;
......@@ -440,4 +448,67 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
return false;
}
private void checkExecution(RPromise<Long> result, AtomicReference<Throwable> failed,
AtomicLong count, AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>());
list.add(key);
}
RPromise<Long> result = new RedissonPromise<>();
AtomicReference<Throwable> failed = new AtomicReference<>();
AtomicLong count = new AtomicLong();
AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = (r, u) -> {
if (u == null) {
List<Long> result1 = (List<Long>) r.getResponses();
for (Long res : result1) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
};
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService.getConnectionManager());
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
}
return result;
}
@Override
public Long del(byte[]... keys) {
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
}
}
......@@ -21,10 +21,7 @@ import org.springframework.data.redis.connection.RedisNode.NodeType;
import org.springframework.data.redis.core.types.RedisClientInfo;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.*;
import static org.assertj.core.api.Assertions.*;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
......@@ -181,6 +178,16 @@ public class RedissonClusterConnectionTest {
assertThat(info.size()).isGreaterThan(10);
}
@Test
public void testDel() {
List<byte[]> keys = new ArrayList<>();
for (int i = 0; i < 10; i++) {
byte[] key = ("test" + i).getBytes();
keys.add(key);
connection.set(key, ("test" + i).getBytes());
}
connection.del(keys.toArray(new byte[0][]));
}
@Test
public void testResetConfigStats() {
......
......@@ -27,9 +27,14 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
......@@ -38,7 +43,10 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterConnection;
......@@ -380,7 +388,6 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
@Override
public void rename(byte[] oldName, byte[] newName) {
if (isPipelined()) {
throw new InvalidDataAccessResourceUsageException("Clustered rename is not supported in a pipeline");
}
......@@ -440,4 +447,67 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
return false;
}
private void checkExecution(RPromise<Long> result, AtomicReference<Throwable> failed,
AtomicLong count, AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>());
list.add(key);
}
RPromise<Long> result = new RedissonPromise<>();
AtomicReference<Throwable> failed = new AtomicReference<>();
AtomicLong count = new AtomicLong();
AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = (r, u) -> {
if (u == null) {
List<Long> result1 = (List<Long>) r.getResponses();
for (Long res : result1) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
};
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService.getConnectionManager());
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
}
return result;
}
@Override
public Long del(byte[]... keys) {
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
}
}
......@@ -26,9 +26,14 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
......@@ -37,7 +42,10 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterConnection;
......@@ -444,4 +452,67 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
return false;
}
private void checkExecution(RPromise<Long> result, AtomicReference<Throwable> failed,
AtomicLong count, AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>());
list.add(key);
}
RPromise<Long> result = new RedissonPromise<>();
AtomicReference<Throwable> failed = new AtomicReference<>();
AtomicLong count = new AtomicLong();
AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = (r, u) -> {
if (u == null) {
List<Long> result1 = (List<Long>) r.getResponses();
for (Long res : result1) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
};
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService.getConnectionManager());
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
}
return result;
}
@Override
public Long del(byte[]... keys) {
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
}
}
......@@ -26,10 +26,15 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
......@@ -39,7 +44,10 @@ import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.DefaultedRedisClusterConnection;
......@@ -493,4 +501,67 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
return false;
}
private void checkExecution(RPromise<Long> result, AtomicReference<Throwable> failed,
AtomicLong count, AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>());
list.add(key);
}
RPromise<Long> result = new RedissonPromise<>();
AtomicReference<Throwable> failed = new AtomicReference<>();
AtomicLong count = new AtomicLong();
AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = (r, u) -> {
if (u == null) {
List<Long> result1 = (List<Long>) r.getResponses();
for (Long res : result1) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
};
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService.getConnectionManager());
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
}
return result;
}
@Override
public Long del(byte[]... keys) {
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
}
}
......@@ -26,10 +26,15 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
......@@ -39,7 +44,10 @@ import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.DefaultedRedisClusterConnection;
......@@ -493,4 +501,67 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
return false;
}
private void checkExecution(RPromise<Long> result, AtomicReference<Throwable> failed,
AtomicLong count, AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>());
list.add(key);
}
RPromise<Long> result = new RedissonPromise<>();
AtomicReference<Throwable> failed = new AtomicReference<>();
AtomicLong count = new AtomicLong();
AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = (r, u) -> {
if (u == null) {
List<Long> result1 = (List<Long>) r.getResponses();
for (Long res : result1) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
};
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService.getConnectionManager());
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
}
return result;
}
@Override
public Long del(byte[]... keys) {
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
}
}
......@@ -26,10 +26,15 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
......@@ -39,7 +44,10 @@ import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.DefaultedRedisClusterConnection;
......@@ -493,4 +501,67 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
return false;
}
private void checkExecution(RPromise<Long> result, AtomicReference<Throwable> failed,
AtomicLong count, AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>());
list.add(key);
}
RPromise<Long> result = new RedissonPromise<>();
AtomicReference<Throwable> failed = new AtomicReference<>();
AtomicLong count = new AtomicLong();
AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = (r, u) -> {
if (u == null) {
List<Long> result1 = (List<Long>) r.getResponses();
for (Long res : result1) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
};
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService.getConnectionManager());
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
}
return result;
}
@Override
public Long del(byte[]... keys) {
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册