提交 72cc6779 编写于 作者: M Matteo Merli 提交者: GitHub

Issue #80: Use receiveAsync() to aggregate messages into shared queue for...

Issue #80: Use receiveAsync() to aggregate messages into shared queue for partitioned consumer (#106)
......@@ -125,9 +125,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
private final ExecutorService lookupIoExecutor = new ThreadPoolExecutor(1, 16, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), new DefaultThreadFactory("pulsar-lookup"));
private DistributedIdGenerator producerNameGenerator;
private final static String producerNameGeneratorPath = "/counters/producer-name";
......@@ -385,11 +382,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
}
if (configuration.isUseTls() && !data.getServiceUrlTls().isEmpty()) {
return new PulsarClientImpl(data.getServiceUrlTls(), configuration, this.workerGroup,
this.lookupIoExecutor);
return new PulsarClientImpl(data.getServiceUrlTls(), configuration, this.workerGroup);
} else {
return new PulsarClientImpl(data.getServiceUrl(), configuration, this.workerGroup,
this.lookupIoExecutor);
return new PulsarClientImpl(data.getServiceUrl(), configuration, this.workerGroup);
}
} catch (Exception e) {
throw new RuntimeException(e);
......
......@@ -29,7 +29,6 @@ import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.PulsarHandler;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConnected;
......@@ -197,7 +196,7 @@ public class ClientCnx extends PulsarHandler {
log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId());
}
}
@Override
protected void handleLookupResponse(CommandLookupTopicResponse lookupResult) {
log.info("Received Broker lookup response: {}", lookupResult.getResponse());
......@@ -225,7 +224,7 @@ public class ClientCnx extends PulsarHandler {
log.warn("{} Received unknown request id from server: {}", ctx.channel(), lookupResult.getRequestId());
}
}
@Override
protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse lookupResult) {
log.info("Received Broker Partition response: {}", lookupResult.getPartitions());
......@@ -262,7 +261,7 @@ public class ClientCnx extends PulsarHandler {
long sequenceId = sendError.getSequenceId();
producers.get(producerId).recoverChecksumError(this, sequenceId);
} else {
ctx.close();
ctx.close();
}
}
......@@ -307,7 +306,7 @@ public class ClientCnx extends PulsarHandler {
log.warn("Consumer with id {} not found while closing consumer ", consumerId);
}
}
@Override
protected boolean isHandshakeCompleted() {
return state == State.Ready;
......
......@@ -47,10 +47,12 @@ public abstract class ConsumerBase extends HandlerBase implements Consumer {
protected final ExecutorService listenerExecutor;
final BlockingQueue<Message> incomingMessages;
protected final ConcurrentLinkedQueue<CompletableFuture<Message>> pendingReceives;
protected final int maxReceiverQueueSize;
protected ConsumerBase(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture, boolean useGrowableQueue) {
ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture) {
super(client, topic);
this.maxReceiverQueueSize = conf.getReceiverQueueSize();
this.subscription = subscription;
this.conf = conf;
this.consumerName = conf.getConsumerName() == null
......@@ -59,11 +61,10 @@ public abstract class ConsumerBase extends HandlerBase implements Consumer {
this.listener = conf.getMessageListener();
if (conf.getReceiverQueueSize() <= 1) {
this.incomingMessages = Queues.newArrayBlockingQueue(1);
} else if (useGrowableQueue) {
this.incomingMessages = new GrowableArrayBlockingQueue<>();
} else {
this.incomingMessages = Queues.newArrayBlockingQueue(conf.getReceiverQueueSize());
this.incomingMessages = new GrowableArrayBlockingQueue<>();
}
this.listenerExecutor = listenerExecutor;
this.pendingReceives = Queues.newConcurrentLinkedQueue();
}
......
......@@ -96,7 +96,7 @@ public class ConsumerImpl extends ConsumerBase {
ConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer> subscribeFuture) {
super(client, topic, subscription, conf, listenerExecutor, subscribeFuture, true /* use growable queue */);
super(client, topic, subscription, conf, listenerExecutor, subscribeFuture);
this.consumerId = client.newConsumerId();
this.availablePermits = new AtomicInteger(0);
this.subscribeTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs();
......@@ -202,7 +202,7 @@ public class ConsumerImpl extends ConsumerBase {
}
if (message == null && conf.getReceiverQueueSize() == 0) {
receiveMessages(cnx(), 1);
sendFlowPermitsToBroker(cnx(), 1);
} else if (message != null) {
messageProcessed(message);
result.complete(message);
......@@ -226,7 +226,7 @@ public class ConsumerImpl extends ConsumerBase {
waitingOnReceiveForZeroQueueSize = true;
synchronized (this) {
if (isConnected()) {
receiveMessages(cnx(), 1);
sendFlowPermitsToBroker(cnx(), 1);
}
}
do {
......@@ -465,7 +465,7 @@ public class ConsumerImpl extends ConsumerBase {
// If the connection is reset and someone is waiting for the messages
// send a flow command
if (waitingOnReceiveForZeroQueueSize) {
receiveMessages(cnx, 1);
sendFlowPermitsToBroker(cnx, 1);
}
} else {
// Consumer was closed while reconnecting, close the connection to make sure the broker
......@@ -483,7 +483,7 @@ public class ConsumerImpl extends ConsumerBase {
// if the consumer is not partitioned or is re-connected and is partitioned, we send the flow
// command to receive messages
if (!(firstTimeConnect && partitionIndex > -1) && conf.getReceiverQueueSize() != 0) {
receiveMessages(cnx, conf.getReceiverQueueSize());
sendFlowPermitsToBroker(cnx, conf.getReceiverQueueSize());
}
}).exceptionally((e) -> {
cnx.removeConsumer(consumerId);
......@@ -518,7 +518,7 @@ public class ConsumerImpl extends ConsumerBase {
/**
* send the flow command to have the broker start pushing messages
*/
void receiveMessages(ClientCnx cnx, int numMessages) {
void sendFlowPermitsToBroker(ClientCnx cnx, int numMessages) {
if (cnx != null) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Adding {} additional permits", topic, subscription, numMessages);
......@@ -784,7 +784,7 @@ public class ConsumerImpl extends ConsumerBase {
while (available >= receiverQueueRefillThreshold) {
if (availablePermits.compareAndSet(available, 0)) {
receiveMessages(currentCnx, available);
sendFlowPermitsToBroker(currentCnx, available);
break;
} else {
available = availablePermits.get();
......@@ -882,7 +882,7 @@ public class ConsumerImpl extends ConsumerBase {
}
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise());
if (currentSize > 0) {
receiveMessages(cnx, currentSize);
sendFlowPermitsToBroker(cnx, currentSize);
}
return;
}
......
......@@ -20,7 +20,9 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
......@@ -42,19 +44,28 @@ import com.yahoo.pulsar.common.naming.DestinationName;
public class PartitionedConsumerImpl extends ConsumerBase {
private List<ConsumerImpl> consumers;
private int numPartitions;
private final ExecutorService internalListenerExecutor;
private final List<ConsumerImpl> consumers;
// Queue of partition consumers on which we have stopped calling receiveAsync() because the
// shared incoming queue was full
private final ConcurrentLinkedQueue<ConsumerImpl> pausedConsumers;
// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to
// resume receiving from the paused consumer partitions
private final int sharedQueueResumeThreshold;
private final int numPartitions;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final ConsumerStats stats;
PartitionedConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
int numPartitions, ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture) {
super(client, topic, subscription, conf, listenerExecutor, subscribeFuture, false /* use fixed size queue */ );
super(client, topic, subscription, conf, listenerExecutor, subscribeFuture);
this.consumers = Lists.newArrayListWithCapacity(numPartitions);
this.pausedConsumers = new ConcurrentLinkedQueue<>();
this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
this.numPartitions = numPartitions;
// gets a new listener thread for the internal listener
this.internalListenerExecutor = client.internalExecutorProvider().getExecutor();
stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStats() : null;
checkArgument(conf.getReceiverQueueSize() > 0,
"Receiver queue size needs to be greater than 0 for Partitioned Topics");
......@@ -67,10 +78,8 @@ public class PartitionedConsumerImpl extends ConsumerBase {
ConsumerConfiguration internalConfig = getInternalConsumerConfig();
for (int partitionIndex = 0; partitionIndex < numPartitions; partitionIndex++) {
String partitionName = DestinationName.get(topic).getPartition(partitionIndex).toString();
// use the same internal listener executor for all the partitions so that when the consumer queue is full
// all the partitions get blocked
ConsumerImpl consumer = new ConsumerImpl(client, partitionName, subscription, internalConfig,
internalListenerExecutor, partitionIndex, new CompletableFuture<Consumer>());
client.externalExecutorProvider().getExecutor(), partitionIndex, new CompletableFuture<Consumer>());
consumers.add(consumer);
consumer.subscribeFuture().handle((cons, subscribeException) -> {
if (subscribeException != null) {
......@@ -82,7 +91,7 @@ public class PartitionedConsumerImpl extends ConsumerBase {
if (subscribeFail.get() == null) {
try {
// We have successfully created N consumers, so we can start receiving messages now
receiveMessages();
starReceivingMessages();
state.set(State.Ready);
subscribeFuture().complete(PartitionedConsumerImpl.this);
log.info("[{}] [{}] Created partitioned consumer", topic, subscription);
......@@ -104,9 +113,41 @@ public class PartitionedConsumerImpl extends ConsumerBase {
}
}
private void receiveMessages() throws PulsarClientException {
private void starReceivingMessages() throws PulsarClientException {
for (ConsumerImpl consumer : consumers) {
consumer.receiveMessages(consumer.cnx(), conf.getReceiverQueueSize());
consumer.sendFlowPermitsToBroker(consumer.cnx(), conf.getReceiverQueueSize());
receiveMessageFromConsumer(consumer);
}
}
private void receiveMessageFromConsumer(ConsumerImpl consumer) {
consumer.receiveAsync().thenAccept(message -> {
// Process the message, add to the queue and trigger listener or async callback
messageReceived(message);
if (incomingMessages.size() >= maxReceiverQueueSize) {
// No more space left in shared queue, mark this consumer to be resumed later
pausedConsumers.add(consumer);
} else {
// Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
// recursion and stack overflow
client.eventLoopGroup().execute(() -> {
receiveMessageFromConsumer(consumer);
});
}
});
}
private void resumeReceivingFromPausedConsumersIfNeeded() {
if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) {
while (true) {
ConsumerImpl consumer = pausedConsumers.poll();
if (consumer == null) {
break;
}
receiveMessageFromConsumer(consumer);
}
}
}
......@@ -115,6 +156,7 @@ public class PartitionedConsumerImpl extends ConsumerBase {
Message message;
try {
message = incomingMessages.take();
resumeReceivingFromPausedConsumersIfNeeded();
return message;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
......@@ -127,6 +169,7 @@ public class PartitionedConsumerImpl extends ConsumerBase {
Message message;
try {
message = incomingMessages.poll(timeout, unit);
resumeReceivingFromPausedConsumersIfNeeded();
return message;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
......@@ -145,6 +188,7 @@ public class PartitionedConsumerImpl extends ConsumerBase {
if (message == null) {
pendingReceives.add(result);
} else {
resumeReceivingFromPausedConsumersIfNeeded();
result.complete(message);
}
} catch (InterruptedException e) {
......@@ -278,40 +322,21 @@ public class PartitionedConsumerImpl extends ConsumerBase {
}
void messageReceived(Message message) {
boolean shouldLock = false;
lock.readLock().lock();
try {
/**
* this method is thread-safe: as only 1 ConsumerImpl thread calls at a time. Lock only if incomingMessages
* (blocking-queue) is not full: else put(..) will block thread and will hold lock which may create deadlock
*/
shouldLock = !(incomingMessages.remainingCapacity() == 0);
if (shouldLock) {
lock.readLock().lock();
}
// if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
if (!pendingReceives.isEmpty()) {
CompletableFuture<Message> receivedFuture = pendingReceives.poll();
listenerExecutor.execute(() -> receivedFuture.complete(message));
// unlock if it is already locked
if (shouldLock) {
lock.readLock().unlock();
}
} else {
// Enqueue the message so that it can be retrieved when application calls receive()
// Waits for the queue to have space for the message
incomingMessages.put(message);
// unlock if it is already locked
if (shouldLock) {
lock.readLock().unlock();
}
}
} catch (InterruptedException e) {
// unlock if it is already locked
if (shouldLock) {
lock.readLock().unlock();
}
Thread.currentThread().interrupt();
} finally {
lock.readLock().unlock();
}
if (listener != null) {
......@@ -350,11 +375,6 @@ public class PartitionedConsumerImpl extends ConsumerBase {
if (conf.getAckTimeoutMillis() != 0) {
internalConsumerConfig.setAckTimeout(conf.getAckTimeoutMillis(), TimeUnit.MILLISECONDS);
}
internalConsumerConfig.setMessageListener((consumer, msg) -> {
if (msg != null) {
messageReceived(msg);
}
});
return internalConsumerConfig;
}
......@@ -371,11 +391,11 @@ public class PartitionedConsumerImpl extends ConsumerBase {
for (ConsumerImpl c : consumers) {
List<MessageIdImpl> consumerMessageIds = new ArrayList<>();
messageIds.removeIf(messageId -> {
if (messageId.getPartitionIndex() == c.getPartitionIndex()) {
consumerMessageIds.add(messageId);
return true;
}
return false;
if (messageId.getPartitionIndex() == c.getPartitionIndex()) {
consumerMessageIds.add(messageId);
return true;
}
return false;
});
c.redeliverUnacknowledgedMessages(consumerMessageIds);
}
......
......@@ -20,9 +20,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
......@@ -62,7 +60,6 @@ public class PulsarClientImpl implements PulsarClient {
private final ConnectionPool cnxPool;
private final Timer timer;
private final ExecutorProvider externalExecutorProvider;
private final ExecutorProvider internalExecutorProvider;
enum State {
Open, Closing, Closed
......@@ -76,15 +73,18 @@ public class PulsarClientImpl implements PulsarClient {
private final AtomicLong consumerIdGenerator = new AtomicLong();
protected static final AtomicLong requestIdGenerator = new AtomicLong();
private final EventLoopGroup eventLoopGroup;
public PulsarClientImpl(String serviceUrl, ClientConfiguration conf) throws PulsarClientException {
this(serviceUrl, conf, getEventLoopGroup(conf), null);
this(serviceUrl, conf, getEventLoopGroup(conf));
}
public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup,
ExecutorService lookupIoExecutor) throws PulsarClientException {
public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup)
throws PulsarClientException {
if (serviceUrl == null || conf == null || eventLoopGroup == null) {
throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
}
this.eventLoopGroup = eventLoopGroup;
this.conf = conf;
conf.getAuthentication().start();
cnxPool = new ConnectionPool(this, eventLoopGroup);
......@@ -97,7 +97,6 @@ public class PulsarClientImpl implements PulsarClient {
}
timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
externalExecutorProvider = new ExecutorProvider(conf.getListenerThreads(), "pulsar-external-listener");
internalExecutorProvider = new ExecutorProvider(conf.getListenerThreads(), "pulsar-internal-listener");
producers = Maps.newIdentityHashMap();
consumers = Maps.newIdentityHashMap();
state.set(State.Open);
......@@ -328,13 +327,12 @@ public class PulsarClientImpl implements PulsarClient {
@Override
public void shutdown() throws PulsarClientException {
try {
if (httpClient!= null) {
httpClient.close();
if (httpClient != null) {
httpClient.close();
}
cnxPool.close();
timer.stop();
externalExecutorProvider.shutdownNow();
internalExecutorProvider.shutdownNow();
conf.getAuthentication().close();
} catch (Throwable t) {
log.warn("Failed to shutdown Pulsar client", t);
......@@ -342,10 +340,10 @@ public class PulsarClientImpl implements PulsarClient {
}
}
protected CompletableFuture<ClientCnx> getConnection(final String topic) {
DestinationName destinationName = DestinationName.get(topic);
return lookup.getBroker(destinationName).thenCompose((brokerAddress) -> cnxPool.getConnection(brokerAddress));
}
protected CompletableFuture<ClientCnx> getConnection(final String topic) {
DestinationName destinationName = DestinationName.get(topic);
return lookup.getBroker(destinationName).thenCompose((brokerAddress) -> cnxPool.getConnection(brokerAddress));
}
protected Timer timer() {
return timer;
......@@ -355,10 +353,6 @@ public class PulsarClientImpl implements PulsarClient {
return externalExecutorProvider;
}
ExecutorProvider internalExecutorProvider() {
return internalExecutorProvider;
}
long newProducerId() {
return producerIdGenerator.getAndIncrement();
}
......@@ -371,6 +365,10 @@ public class PulsarClientImpl implements PulsarClient {
return requestIdGenerator.getAndIncrement();
}
EventLoopGroup eventLoopGroup() {
return eventLoopGroup;
}
private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(String topic) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture;
......
......@@ -179,8 +179,7 @@ public class PerformanceConsumer {
if (isNotBlank(arguments.authPluginClassName)) {
clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams);
}
PulsarClient pulsarClient = new PulsarClientImpl(arguments.serviceURL, clientConf, eventLoopGroup,
Executors.newFixedThreadPool(16));
PulsarClient pulsarClient = new PulsarClientImpl(arguments.serviceURL, clientConf, eventLoopGroup);
List<Future<Consumer>> futures = Lists.newArrayList();
ConsumerConfiguration consumerConfig = new ConsumerConfiguration();
......
......@@ -211,8 +211,7 @@ public class PerformanceProducer {
clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams);
}
PulsarClient client = new PulsarClientImpl(arguments.serviceURL, clientConf, eventLoopGroup,
Executors.newFixedThreadPool(16));
PulsarClient client = new PulsarClientImpl(arguments.serviceURL, clientConf, eventLoopGroup);
ProducerConfiguration producerConf = new ProducerConfiguration();
producerConf.setSendTimeout(0, TimeUnit.SECONDS);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册