提交 d89563ce 编写于 作者: R Rajan 提交者: GitHub

connect dispatcher again on reset cursor complete (#250)

* connect dispatcher again on reset cursor complete

* Renaming methods

* Only disconnect consumers on cursor-reset
上级 ae381b4c
......@@ -40,11 +40,29 @@ public interface Dispatcher {
boolean canUnsubscribe(Consumer consumer);
CompletableFuture<Void> disconnect();
/**
* mark dispatcher closed to stop new incoming requests and disconnect all consumers
*
* @return
*/
CompletableFuture<Void> close();
/**
* disconnect all consumers
*
* @return
*/
CompletableFuture<Void> disconnectAllConsumers();
/**
* mark dispatcher open to serve new incoming requests
*/
void reset();
SubType getType();
void redeliverUnacknowledgedMessages(Consumer consumer);
void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions);
}
......@@ -20,6 +20,7 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
......@@ -36,12 +37,12 @@ import com.carrotsearch.hppc.ObjectSet;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.util.Codec;
import com.yahoo.pulsar.broker.service.BrokerServiceException;
import com.yahoo.pulsar.broker.service.Consumer;
import com.yahoo.pulsar.broker.service.Dispatcher;
import com.yahoo.pulsar.broker.service.BrokerServiceException;
import com.yahoo.pulsar.client.impl.Backoff;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.util.Codec;
import com.yahoo.pulsar.utils.CopyOnWriteArrayList;
/**
......@@ -68,6 +69,11 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
private int totalAvailablePermits = 0;
private int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
private static final int FALSE = 0;
private static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> IS_CLOSED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "isClosed");
private volatile int isClosed = FALSE;
enum ReadType {
Normal, Replay
......@@ -83,6 +89,10 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
@Override
public synchronized void addConsumer(Consumer consumer) {
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer ", name, consumer);
consumer.disconnect();
}
if (consumerList.isEmpty()) {
if (havePendingRead || havePendingReplayRead) {
// There is a pending read from previous run. We must wait for it to complete and then rewind
......@@ -210,7 +220,13 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
}
@Override
public synchronized CompletableFuture<Void> disconnect() {
public CompletableFuture<Void> close() {
IS_CLOSED_UPDATER.set(this, TRUE);
return disconnectAllConsumers();
}
@Override
public synchronized CompletableFuture<Void> disconnectAllConsumers() {
closeFuture = new CompletableFuture<>();
if (consumerList.isEmpty()) {
closeFuture.complete(null);
......@@ -223,6 +239,11 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
return closeFuture;
}
@Override
public void reset() {
IS_CLOSED_UPDATER.set(this, FALSE);
}
@Override
public SubType getType() {
return SubType.Shared;
......@@ -400,7 +421,7 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
* @return nextAvailableConsumer
*/
public Consumer getNextConsumer() {
if (consumerList.isEmpty() || closeFuture != null) {
if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) {
// abort read if no consumers are connected or if disconnect is initiated
return null;
}
......@@ -466,7 +487,7 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
* @return
*/
private boolean isAtleastOneConsumerAvailable() {
if (consumerList.isEmpty() || closeFuture != null) {
if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) {
// abort read if no consumers are connected or if disconnect is initiated
return false;
}
......
......@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
......@@ -60,6 +61,11 @@ public final class PersistentDispatcherSingleActiveConsumer implements Dispatche
private static final int MaxReadBatchSize = 100;
private int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
private static final int FALSE = 0;
private static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<PersistentDispatcherSingleActiveConsumer> IS_CLOSED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherSingleActiveConsumer.class, "isClosed");
private volatile int isClosed = FALSE;
public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
PersistentTopic topic) {
......@@ -99,6 +105,10 @@ public final class PersistentDispatcherSingleActiveConsumer implements Dispatche
@Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer ", this.topic.getName(), consumer);
consumer.disconnect();
}
if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
throw new ConsumerBusyException("Exclusive consumer is already connected");
}
......@@ -149,6 +159,12 @@ public final class PersistentDispatcherSingleActiveConsumer implements Dispatche
return (consumers.size() == 1) && Objects.equals(consumer, ACTIVE_CONSUMER_UPDATER.get(this));
}
@Override
public CompletableFuture<Void> close() {
IS_CLOSED_UPDATER.set(this, TRUE);
return disconnectAllConsumers();
}
/**
* Disconnect all consumers on this dispatcher (server side close). This triggers channelInactive on the inbound
* handler which calls dispatcher.removeConsumer(), where the closeFuture is completed
......@@ -156,7 +172,7 @@ public final class PersistentDispatcherSingleActiveConsumer implements Dispatche
* @return
*/
@Override
public synchronized CompletableFuture<Void> disconnect() {
public synchronized CompletableFuture<Void> disconnectAllConsumers() {
closeFuture = new CompletableFuture<>();
if (!consumers.isEmpty()) {
......@@ -171,6 +187,11 @@ public final class PersistentDispatcherSingleActiveConsumer implements Dispatche
return closeFuture;
}
@Override
public void reset() {
IS_CLOSED_UPDATER.set(this, FALSE);
}
@Override
public synchronized void readEntriesComplete(final List<Entry> entries, Object obj) {
Consumer readConsumer = (Consumer) obj;
......
......@@ -332,7 +332,7 @@ public class PersistentSubscription implements Subscription {
return;
}
dispatcher.disconnect().whenComplete((aVoid, throwable) -> {
dispatcher.disconnectAllConsumers().whenComplete((aVoid, throwable) -> {
if (throwable != null) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable);
......@@ -471,13 +471,13 @@ public class PersistentSubscription implements Subscription {
// block any further consumers on this subscription
IS_FENCED_UPDATER.set(this, TRUE);
(dispatcher != null ? dispatcher.disconnect() : CompletableFuture.completedFuture(null))
(dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null))
.thenCompose(v -> close()).thenRun(() -> {
log.info("[{}][{}] Successfully disconnected and closed subscription", topicName, subName);
disconnectFuture.complete(null);
}).exceptionally(exception -> {
IS_FENCED_UPDATER.set(this, FALSE);
dispatcher.reset();
log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName,
exception);
disconnectFuture.completeExceptionally(exception);
......
......@@ -16,17 +16,25 @@
package com.yahoo.pulsar.client.impl;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
......@@ -38,16 +46,18 @@ import com.yahoo.pulsar.broker.service.Topic;
import com.yahoo.pulsar.client.api.Consumer;
import com.yahoo.pulsar.client.api.ConsumerConfiguration;
import com.yahoo.pulsar.client.api.Message;
import com.yahoo.pulsar.client.api.MessageListener;
import com.yahoo.pulsar.client.api.Producer;
import com.yahoo.pulsar.client.api.ProducerConfiguration;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.client.api.SubscriptionType;
import com.yahoo.pulsar.client.impl.HandlerBase.State;
import com.yahoo.pulsar.common.api.PulsarHandler;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.policies.data.RetentionPolicies;
import com.yahoo.pulsar.common.util.collections.ConcurrentLongHashMap;
public class BrokerClientIntegrationTest extends ProducerConsumerBase {
......@@ -337,5 +347,124 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase {
batchProducer.close();
log.info("-- Exiting {} test --", methodName);
}
@Test(timeOut = 10000, dataProvider = "subType")
public void testResetCursor(SubscriptionType subType) throws Exception {
final RetentionPolicies policy = new RetentionPolicies(60, 52 * 1024);
final DestinationName destName = DestinationName.get("persistent://my-property/use/my-ns/unacked-topic");
final int warmup = 20;
final int testSize = 150;
final List<Message> received = new ArrayList<Message>();
final ConsumerConfiguration consConfig = new ConsumerConfiguration();
final String subsId = "sub";
final NavigableMap<Long, TimestampEntryCount> publishTimeIdMap = new ConcurrentSkipListMap<>();
consConfig.setSubscriptionType(subType);
consConfig.setMessageListener((MessageListener) (Consumer consumer, Message msg) -> {
try {
synchronized (received) {
received.add(msg);
}
consumer.acknowledge(msg);
long publishTime = ((MessageImpl) msg).getPublishTime();
System.out.println(" publish time is " + publishTime + "," + msg.getMessageId());
TimestampEntryCount timestampEntryCount = publishTimeIdMap.computeIfAbsent(publishTime,
(k) -> new TimestampEntryCount(publishTime));
timestampEntryCount.incrementAndGet();
} catch (final PulsarClientException e) {
System.out.println("Failed to ack!");
}
});
admin.namespaces().setRetention(destName.getNamespace(), policy);
Consumer consumer = pulsarClient.subscribe(destName.toString(), subsId, consConfig);
final Producer producer = pulsarClient.createProducer(destName.toString());
log.info("warm up started for " + destName.toString());
// send warmup msgs
byte[] msgBytes = new byte[1000];
for (Integer i = 0; i < warmup; i++) {
producer.send(msgBytes);
}
log.info("warm up finished.");
// sleep to ensure receiving of msgs
for (int n = 0; n < 10 && received.size() < warmup; n++) {
Thread.sleep(100);
}
// validate received msgs
Assert.assertEquals(received.size(), warmup);
received.clear();
// publish testSize num of msgs
System.out.println("Sending more messages.");
for (Integer n = 0; n < testSize; n++) {
producer.send(msgBytes);
Thread.sleep(1);
}
log.info("Sending more messages done.");
Thread.sleep(3000);
long begints = publishTimeIdMap.firstEntry().getKey();
long endts = publishTimeIdMap.lastEntry().getKey();
// find reset timestamp
long timestamp = (endts - begints) / 2 + begints;
timestamp = publishTimeIdMap.floorKey(timestamp);
NavigableMap<Long, TimestampEntryCount> expectedMessages = new ConcurrentSkipListMap<>();
expectedMessages.putAll(publishTimeIdMap.tailMap(timestamp, true));
received.clear();
log.info("reset cursor to " + timestamp + " for topic " + destName.toString() + " for subs " + subsId);
System.out.println("issuing admin operation on " + admin.getServiceUrl().toString());
List<String> subList = admin.persistentTopics().getSubscriptions(destName.toString());
for (String subs : subList) {
log.info("got sub " + subs);
}
publishTimeIdMap.clear();
// reset the cursor to this timestamp
Assert.assertTrue(subList.contains(subsId));
admin.persistentTopics().resetCursor(destName.toString(), subsId, timestamp);
consumer = pulsarClient.subscribe(destName.toString(), subsId, consConfig);
Thread.sleep(3000);
int totalExpected = 0;
for (TimestampEntryCount tec : expectedMessages.values()) {
totalExpected += tec.numMessages;
}
// validate that replay happens after the timestamp
Assert.assertTrue(publishTimeIdMap.firstEntry().getKey() >= timestamp);
consumer.close();
producer.close();
// validate that expected and received counts match
int totalReceived = 0;
for (TimestampEntryCount tec : publishTimeIdMap.values()) {
totalReceived += tec.numMessages;
}
Assert.assertEquals(totalReceived, totalExpected, "did not receive all messages on replay after reset");
}
private static class TimestampEntryCount {
private final long timestamp;
private int numMessages;
public TimestampEntryCount(long ts) {
this.numMessages = 0;
this.timestamp = ts;
}
public int incrementAndGet() {
return ++numMessages;
}
public long getTimestamp() {
return timestamp;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册