提交 7c6054e9 编写于 作者: N Nikita Koksharov

Fixed - mGet() and mSet() methods of RedissonConnection object in Spring Data...

Fixed - mGet() and mSet() methods of RedissonConnection object in Spring Data module throw CROSSSLOT error. #3582
上级 630d33ee
......@@ -15,26 +15,10 @@
*/
package org.redisson.spring.data.connection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import io.netty.util.CharsetUtil;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
......@@ -46,8 +30,6 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterConnection;
......@@ -57,7 +39,10 @@ import org.springframework.data.redis.connection.convert.StringToRedisClientInfo
import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.util.Assert;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.Map.Entry;
import java.util.stream.Collectors;
/**
*
......@@ -449,75 +434,54 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
return false;
}
private void checkExecution(RPromise<Long> result, AtomicReference<Throwable> failed,
AtomicLong count, AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
@Override
public Long del(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>());
list.add(key);
return null;
}
RPromise<Long> result = new RedissonPromise<>();
AtomicReference<Throwable> failed = new AtomicReference<>();
AtomicLong count = new AtomicLong();
AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = (r, u) -> {
if (u == null) {
List<Long> result1 = (List<Long>) r.getResponses();
for (Long res : result1) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
};
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key: keys) {
es.writeAsync(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
}
BatchResult<Long> b = (BatchResult<Long>) es.execute();
return b.getResponses().stream().collect(Collectors.summarizingLong(v -> v)).getSum();
}
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
@Override
public List<byte[]> mGet(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key : keys) {
read(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
return null;
}
return result;
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key: keys) {
es.readAsync(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key);
}
BatchResult<byte[]> r = (BatchResult<byte[]>) es.execute();
return r.getResponses();
}
@Override
public Long del(byte[]... keys) {
public void mSet(Map<byte[], byte[]> tuple) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
for (Entry<byte[], byte[]> entry: tuple.entrySet()) {
write(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue());
}
return null;
return;
}
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
CommandBatchService es = new CommandBatchService(executorService);
for (Entry<byte[], byte[]> entry: tuple.entrySet()) {
es.writeAsync(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue());
}
es.execute();
}
}
......@@ -186,9 +186,32 @@ public class RedissonClusterConnectionTest {
keys.add(key);
connection.set(key, ("test" + i).getBytes());
}
connection.del(keys.toArray(new byte[0][]));
assertThat(connection.del(keys.toArray(new byte[0][]))).isEqualTo(10);
}
@Test
public void testMSet() {
Map<byte[], byte[]> map = new HashMap<>();
for (int i = 0; i < 10; i++) {
map.put(("test" + i).getBytes(), ("test" + i*100).getBytes());
}
connection.mSet(map);
for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
assertThat(connection.get(entry.getKey())).isEqualTo(entry.getValue());
}
}
@Test
public void testMGet() {
Map<byte[], byte[]> map = new HashMap<>();
for (int i = 0; i < 10; i++) {
map.put(("test" + i).getBytes(), ("test" + i*100).getBytes());
}
connection.mSet(map);
List<byte[]> r = connection.mGet(map.keySet().toArray(new byte[0][]));
assertThat(r).containsExactly(map.values().toArray(new byte[0][]));
}
@Test
public void testResetConfigStats() {
RedisClusterNode master = getFirstMaster();
......
......@@ -15,26 +15,10 @@
*/
package org.redisson.spring.data.connection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import io.netty.util.CharsetUtil;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
......@@ -46,8 +30,6 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterConnection;
......@@ -57,7 +39,10 @@ import org.springframework.data.redis.connection.convert.StringToRedisClientInfo
import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.util.Assert;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.Map.Entry;
import java.util.stream.Collectors;
/**
*
......@@ -448,75 +433,54 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
return false;
}
private void checkExecution(RPromise<Long> result, AtomicReference<Throwable> failed,
AtomicLong count, AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
@Override
public Long del(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>());
list.add(key);
return null;
}
RPromise<Long> result = new RedissonPromise<>();
AtomicReference<Throwable> failed = new AtomicReference<>();
AtomicLong count = new AtomicLong();
AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = (r, u) -> {
if (u == null) {
List<Long> result1 = (List<Long>) r.getResponses();
for (Long res : result1) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
};
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key: keys) {
es.writeAsync(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
}
BatchResult<Long> b = (BatchResult<Long>) es.execute();
return b.getResponses().stream().collect(Collectors.summarizingLong(v -> v)).getSum();
}
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
@Override
public List<byte[]> mGet(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key : keys) {
read(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
return null;
}
return result;
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key: keys) {
es.readAsync(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key);
}
BatchResult<byte[]> r = (BatchResult<byte[]>) es.execute();
return r.getResponses();
}
@Override
public Long del(byte[]... keys) {
public void mSet(Map<byte[], byte[]> tuple) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
for (Entry<byte[], byte[]> entry: tuple.entrySet()) {
write(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue());
}
return null;
return;
}
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
CommandBatchService es = new CommandBatchService(executorService);
for (Entry<byte[], byte[]> entry: tuple.entrySet()) {
es.writeAsync(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue());
}
es.execute();
}
}
......@@ -15,25 +15,10 @@
*/
package org.redisson.spring.data.connection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import io.netty.util.CharsetUtil;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
......@@ -45,8 +30,6 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterConnection;
......@@ -57,7 +40,10 @@ import org.springframework.data.redis.connection.convert.StringToRedisClientInfo
import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.util.Assert;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.Map.Entry;
import java.util.stream.Collectors;
/**
*
......@@ -453,75 +439,56 @@ public class RedissonClusterConnection extends RedissonConnection implements Red
return false;
}
private void checkExecution(RPromise<Long> result, AtomicReference<Throwable> failed,
AtomicLong count, AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
@Override
public Long del(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>());
list.add(key);
return null;
}
RPromise<Long> result = new RedissonPromise<>();
AtomicReference<Throwable> failed = new AtomicReference<>();
AtomicLong count = new AtomicLong();
AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = (r, u) -> {
if (u == null) {
List<Long> result1 = (List<Long>) r.getResponses();
for (Long res : result1) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
};
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key: keys) {
es.writeAsync(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
}
BatchResult<Long> b = (BatchResult<Long>) es.execute();
return b.getResponses().stream().collect(Collectors.summarizingLong(v -> v)).getSum();
}
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
@Override
public List<byte[]> mGet(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key : keys) {
read(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
return null;
}
return result;
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key: keys) {
es.readAsync(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key);
}
BatchResult<byte[]> r = (BatchResult<byte[]>) es.execute();
return r.getResponses();
}
@Override
public Long del(byte[]... keys) {
public Boolean mSet(Map<byte[], byte[]> tuple) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
for (Entry<byte[], byte[]> entry: tuple.entrySet()) {
write(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue());
}
return null;
return true;
}
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
CommandBatchService es = new CommandBatchService(executorService);
for (Entry<byte[], byte[]> entry: tuple.entrySet()) {
es.writeAsync(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue());
}
es.execute();
return true;
}
}
......@@ -15,26 +15,11 @@
*/
package org.redisson.spring.data.connection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import io.netty.util.CharsetUtil;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
......@@ -47,8 +32,6 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.DefaultedRedisClusterConnection;
......@@ -63,7 +46,10 @@ import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.util.Assert;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.Map.Entry;
import java.util.stream.Collectors;
/**
*
......@@ -502,75 +488,56 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
return false;
}
private void checkExecution(RPromise<Long> result, AtomicReference<Throwable> failed,
AtomicLong count, AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
@Override
public Long del(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>());
list.add(key);
return null;
}
RPromise<Long> result = new RedissonPromise<>();
AtomicReference<Throwable> failed = new AtomicReference<>();
AtomicLong count = new AtomicLong();
AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = (r, u) -> {
if (u == null) {
List<Long> result1 = (List<Long>) r.getResponses();
for (Long res : result1) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
};
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key: keys) {
es.writeAsync(key, StringCodec.INSTANCE, RedisCommands.DEL, key);
}
BatchResult<Long> b = (BatchResult<Long>) es.execute();
return b.getResponses().stream().collect(Collectors.summarizingLong(v -> v)).getSum();
}
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
@Override
public List<byte[]> mGet(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key : keys) {
read(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
return null;
}
return result;
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key: keys) {
es.readAsync(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key);
}
BatchResult<byte[]> r = (BatchResult<byte[]>) es.execute();
return r.getResponses();
}
@Override
public Long del(byte[]... keys) {
public Boolean mSet(Map<byte[], byte[]> tuple) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
for (Entry<byte[], byte[]> entry: tuple.entrySet()) {
write(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue());
}
return null;
return true;
}
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
CommandBatchService es = new CommandBatchService(executorService);
for (Entry<byte[], byte[]> entry: tuple.entrySet()) {
es.writeAsync(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue());
}
es.execute();
return true;
}
}
......@@ -15,26 +15,11 @@
*/
package org.redisson.spring.data.connection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import io.netty.util.CharsetUtil;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
......@@ -47,8 +32,6 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.DefaultedRedisClusterConnection;
......@@ -63,7 +46,10 @@ import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.util.Assert;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.Map.Entry;
import java.util.stream.Collectors;
/**
*
......@@ -502,75 +488,56 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
return false;
}
private void checkExecution(RPromise<Long> result, AtomicReference<Throwable> failed,
AtomicLong count, AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
@Override
public Long del(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>());
list.add(key);
return null;
}
RPromise<Long> result = new RedissonPromise<>();
AtomicReference<Throwable> failed = new AtomicReference<>();
AtomicLong count = new AtomicLong();
AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = (r, u) -> {
if (u == null) {
List<Long> result1 = (List<Long>) r.getResponses();
for (Long res : result1) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
};
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key: keys) {
es.writeAsync(key, StringCodec.INSTANCE, RedisCommands.DEL, key);
}
BatchResult<Long> b = (BatchResult<Long>) es.execute();
return b.getResponses().stream().collect(Collectors.summarizingLong(v -> v)).getSum();
}
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
@Override
public List<byte[]> mGet(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key : keys) {
read(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
return null;
}
return result;
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key: keys) {
es.readAsync(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key);
}
BatchResult<byte[]> r = (BatchResult<byte[]>) es.execute();
return r.getResponses();
}
@Override
public Long del(byte[]... keys) {
public Boolean mSet(Map<byte[], byte[]> tuple) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
for (Entry<byte[], byte[]> entry: tuple.entrySet()) {
write(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue());
}
return null;
return true;
}
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
CommandBatchService es = new CommandBatchService(executorService);
for (Entry<byte[], byte[]> entry: tuple.entrySet()) {
es.writeAsync(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue());
}
es.execute();
return true;
}
}
......@@ -15,26 +15,11 @@
*/
package org.redisson.spring.data.connection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import io.netty.util.CharsetUtil;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
......@@ -47,8 +32,6 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.DefaultedRedisClusterConnection;
......@@ -63,7 +46,10 @@ import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.util.Assert;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.Map.Entry;
import java.util.stream.Collectors;
/**
*
......@@ -502,75 +488,56 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
return false;
}
private void checkExecution(RPromise<Long> result, AtomicReference<Throwable> failed,
AtomicLong count, AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
@Override
public Long del(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>());
list.add(key);
return null;
}
RPromise<Long> result = new RedissonPromise<>();
AtomicReference<Throwable> failed = new AtomicReference<>();
AtomicLong count = new AtomicLong();
AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = (r, u) -> {
if (u == null) {
List<Long> result1 = (List<Long>) r.getResponses();
for (Long res : result1) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
};
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key: keys) {
es.writeAsync(key, StringCodec.INSTANCE, RedisCommands.DEL, key);
}
BatchResult<Long> b = (BatchResult<Long>) es.execute();
return b.getResponses().stream().collect(Collectors.summarizingLong(v -> v)).getSum();
}
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
@Override
public List<byte[]> mGet(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key : keys) {
read(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
return null;
}
return result;
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key: keys) {
es.readAsync(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key);
}
BatchResult<byte[]> r = (BatchResult<byte[]>) es.execute();
return r.getResponses();
}
@Override
public Long del(byte[]... keys) {
public Boolean mSet(Map<byte[], byte[]> tuple) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
for (Entry<byte[], byte[]> entry: tuple.entrySet()) {
write(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue());
}
return null;
return true;
}
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
CommandBatchService es = new CommandBatchService(executorService);
for (Entry<byte[], byte[]> entry: tuple.entrySet()) {
es.writeAsync(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue());
}
es.execute();
return true;
}
}
......@@ -15,26 +15,11 @@
*/
package org.redisson.spring.data.connection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import io.netty.util.CharsetUtil;
import org.redisson.api.BatchResult;
import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
......@@ -47,8 +32,6 @@ import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.DefaultedRedisClusterConnection;
......@@ -63,7 +46,10 @@ import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.util.Assert;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.Map.Entry;
import java.util.stream.Collectors;
/**
*
......@@ -502,75 +488,56 @@ public class RedissonClusterConnection extends RedissonConnection implements Def
return false;
}
private void checkExecution(RPromise<Long> result, AtomicReference<Throwable> failed,
AtomicLong count, AtomicLong executed) {
if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
if (count.get() > 0) {
RedisException ex = new RedisException("" + count.get() + " keys has been deleted. But one or more nodes has an error", failed.get());
result.tryFailure(ex);
} else {
result.tryFailure(failed.get());
}
} else {
result.trySuccess(count.get());
@Override
public Long del(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
}
}
}
private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, byte[] ... keys) {
Map<MasterSlaveEntry, List<byte[]>> range2key = new HashMap<>();
for (byte[] key : keys) {
int slot = executorService.getConnectionManager().calcSlot(key);
MasterSlaveEntry entry = executorService.getConnectionManager().getEntry(slot);
List<byte[]> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>());
list.add(key);
return null;
}
RPromise<Long> result = new RedissonPromise<>();
AtomicReference<Throwable> failed = new AtomicReference<>();
AtomicLong count = new AtomicLong();
AtomicLong executed = new AtomicLong(range2key.size());
BiConsumer<BatchResult<?>, Throwable> listener = (r, u) -> {
if (u == null) {
List<Long> result1 = (List<Long>) r.getResponses();
for (Long res : result1) {
if (res != null) {
count.addAndGet(res);
}
}
} else {
failed.set(u);
}
checkExecution(result, failed, count, executed);
};
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key: keys) {
es.writeAsync(key, StringCodec.INSTANCE, RedisCommands.DEL, key);
}
BatchResult<Long> b = (BatchResult<Long>) es.execute();
return b.getResponses().stream().collect(Collectors.summarizingLong(v -> v)).getSum();
}
for (Entry<MasterSlaveEntry, List<byte[]>> entry : range2key.entrySet()) {
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key : entry.getValue()) {
es.writeAsync(entry.getKey(), null, command, key);
@Override
public List<byte[]> mGet(byte[]... keys) {
if (isQueueing() || isPipelined()) {
for (byte[] key : keys) {
read(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key);
}
RFuture<BatchResult<?>> future = es.executeAsync();
future.onComplete(listener);
return null;
}
return result;
CommandBatchService es = new CommandBatchService(executorService);
for (byte[] key: keys) {
es.readAsync(key, ByteArrayCodec.INSTANCE, RedisCommands.GET, key);
}
BatchResult<byte[]> r = (BatchResult<byte[]>) es.execute();
return r.getResponses();
}
@Override
public Long del(byte[]... keys) {
public Boolean mSet(Map<byte[], byte[]> tuple) {
if (isQueueing() || isPipelined()) {
for (byte[] key: keys) {
write(key, LongCodec.INSTANCE, RedisCommands.DEL, key);
for (Entry<byte[], byte[]> entry: tuple.entrySet()) {
write(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue());
}
return null;
return true;
}
RFuture<Long> f = executeAsync(RedisCommands.DEL, keys);
return sync(f);
CommandBatchService es = new CommandBatchService(executorService);
for (Entry<byte[], byte[]> entry: tuple.entrySet()) {
es.writeAsync(entry.getKey(), StringCodec.INSTANCE, RedisCommands.SET, entry.getKey(), entry.getValue());
}
es.execute();
return true;
}
}
......@@ -13,7 +13,6 @@ import org.redisson.config.Config;
import org.redisson.config.SubscriptionMode;
import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.balancer.RandomLoadBalancer;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.ClusterInfo;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisConnectionFactory;
......@@ -23,8 +22,7 @@ import org.springframework.data.redis.core.types.RedisClientInfo;
import java.io.IOException;
import java.util.*;
import static org.assertj.core.api.Assertions.*;
import static org.redisson.connection.MasterSlaveConnectionManager.MAX_SLOT;
import static org.assertj.core.api.Assertions.assertThat;
public class RedissonClusterConnectionTest {
......@@ -64,6 +62,40 @@ public class RedissonClusterConnectionTest {
redisson.shutdown();
}
@Test
public void testDel() {
List<byte[]> keys = new ArrayList<>();
for (int i = 0; i < 10; i++) {
byte[] key = ("test" + i).getBytes();
keys.add(key);
connection.set(key, ("test" + i).getBytes());
}
assertThat(connection.del(keys.toArray(new byte[0][]))).isEqualTo(10);
}
@Test
public void testMSet() {
Map<byte[], byte[]> map = new HashMap<>();
for (int i = 0; i < 10; i++) {
map.put(("test" + i).getBytes(), ("test" + i*100).getBytes());
}
connection.mSet(map);
for (Map.Entry<byte[], byte[]> entry : map.entrySet()) {
assertThat(connection.get(entry.getKey())).isEqualTo(entry.getValue());
}
}
@Test
public void testMGet() {
Map<byte[], byte[]> map = new HashMap<>();
for (int i = 0; i < 10; i++) {
map.put(("test" + i).getBytes(), ("test" + i*100).getBytes());
}
connection.mSet(map);
List<byte[]> r = connection.mGet(map.keySet().toArray(new byte[0][]));
assertThat(r).containsExactly(map.values().toArray(new byte[0][]));
}
@Test
public void testClusterGetNodes() {
Iterable<RedisClusterNode> nodes = connection.clusterGetNodes();
......@@ -194,17 +226,6 @@ public class RedissonClusterConnectionTest {
assertThat(res).isEqualTo(1);
}
@Test
public void testDel() {
List<byte[]> keys = new ArrayList<>();
for (int i = 0; i < 10; i++) {
byte[] key = ("test" + i).getBytes();
keys.add(key);
connection.set(key, ("test" + i).getBytes());
}
connection.del(keys.toArray(new byte[0][]));
}
@Test
public void testResetConfigStats() {
RedisClusterNode master = getFirstMaster();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册