提交 c296910b 编写于 作者: N Nikita Koksharov

Fixed - RPriorityBlockingQueue.poll(int limited) method implemented

上级 c23f5c8d
......@@ -220,7 +220,20 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
@Override
public RFuture<List<V>> pollAsync(int limit) {
throw new UnsupportedOperationException();
return pollAsync(() -> {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_LIST,
"local result = {};"
+ "for i = 1, ARGV[1], 1 do " +
"local value = redis.call('lpop', KEYS[1]);" +
"if value ~= false then " +
"table.insert(result, value);" +
"else " +
"return result;" +
"end;" +
"end; " +
"return result;",
Collections.singletonList(getName()), limit);
});
}
@Override
......@@ -235,6 +248,6 @@ public class RedissonPriorityBlockingQueue<V> extends RedissonPriorityQueue<V> i
@Override
public List<V> poll(int limit) {
throw new UnsupportedOperationException();
return get(pollAsync(limit));
}
}
\ No newline at end of file
......@@ -26,6 +26,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.redisson.api.RBucket;
import org.redisson.api.RFuture;
......@@ -290,15 +291,21 @@ public class RedissonPriorityQueue<V> extends RedissonList<V> implements RPriori
}
protected <T> RFuture<V> pollAsync(RedisCommand<T> command, Object... params) {
return pollAsync(() -> {
return commandExecutor.writeAsync(getName(), codec, command, params);
});
};
protected final <T, R> RFuture<R> pollAsync(Supplier<RFuture<R>> callable) {
long threadId = Thread.currentThread().getId();
RPromise<V> result = new RedissonPromise<V>();
RPromise<R> result = new RedissonPromise<R>();
lock.lockAsync(threadId).onComplete((r, exc) -> {
if (exc != null) {
result.tryFailure(exc);
return;
}
RFuture<V> f = commandExecutor.writeAsync(getName(), codec, command, params);
RFuture<R> f = callable.get();
f.onComplete((value, e) -> {
if (e != null) {
result.tryFailure(e);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册