提交 6560ce10 编写于 作者: N Nikita Koksharov

Fixed - RBlockingDequeReactive.takeElements() method does not consume all elements. #3673

上级 588669d6
......@@ -15,15 +15,14 @@
*/
package org.redisson.reactive;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RFuture;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
*
* @author Nikita Koksharov
......@@ -31,7 +30,7 @@ import reactor.core.publisher.FluxSink;
*/
public class ElementsStream {
private static <V> void take(final Callable<RFuture<V>> factory, final FluxSink<V> emitter, final AtomicLong counter, final AtomicReference<RFuture<V>> futureRef) {
private static <V> void take(Callable<RFuture<V>> factory, FluxSink<V> emitter, AtomicLong counter, AtomicReference<RFuture<V>> futureRef) {
RFuture<V> future;
try {
future = factory.call();
......@@ -48,7 +47,7 @@ public class ElementsStream {
emitter.next(res);
if (counter.decrementAndGet() == 0) {
emitter.complete();
return;
}
take(factory, emitter, counter, futureRef);
......@@ -56,14 +55,14 @@ public class ElementsStream {
}
public static <V> Flux<V> takeElements(Callable<RFuture<V>> callable) {
return Flux.<V>create(emitter -> {
return Flux.create(emitter -> {
AtomicReference<RFuture<V>> futureRef = new AtomicReference<RFuture<V>>();
emitter.onRequest(n -> {
AtomicLong counter = new AtomicLong(n);
AtomicReference<RFuture<V>> futureRef = new AtomicReference<RFuture<V>>();
take(callable, emitter, counter, futureRef);
emitter.onDispose(() -> {
futureRef.get().cancel(true);
});
});
emitter.onDispose(() -> {
futureRef.get().cancel(true);
});
});
}
......
package org.redisson;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.api.RBlockingDequeReactive;
import org.redisson.api.RBlockingQueueReactive;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
......@@ -19,6 +25,30 @@ import static org.assertj.core.api.Assertions.assertThat;
public class RedissonBlockingQueueReactiveTest extends BaseReactiveTest {
@Test
public void testTakeElements2() throws InterruptedException {
RBlockingDequeReactive<Long> queue = redisson.getBlockingDeque("test");
Mono<Void> mono = Flux.range(1, 100)
.flatMap(s -> queue.add(Long.valueOf(s)))
.then();
StepVerifier.create(mono).verifyComplete();
AtomicInteger counter = new AtomicInteger();
queue.takeElements()
.doOnNext(e -> {
counter.incrementAndGet();
})
.delayElements(Duration.ofMillis(2))
.repeat()
.subscribe();
Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> {
assertThat(counter.get()).isEqualTo(100);
});
}
@Test
public void testTakeElements() {
RBlockingQueueReactive<Integer> queue = redisson.getBlockingQueue("test");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册