未验证 提交 cb34e6a5 编写于 作者: M Matteo Merli 提交者: GitHub

Moved entries filtering from consumer to dispatcher (#4329)

* Moved entries filtering from consumer to dispatcher

* Optimize when all messages were filtered out

* Fixed pending adds adding and re-added check for older consumers

* Fixed reusing of thread local from different thread

* Pass the redelivery tracker to consumer.sendMessages()
上级 bf06ef3e
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import io.netty.buffer.ByteBuf;
import java.util.Collections;
import java.util.List;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
public abstract class AbstractBaseDispatcher {
protected final Subscription subscription;
protected AbstractBaseDispatcher(Subscription subscription) {
this.subscription = subscription;
}
/**
* Filter messages that are being sent to a consumers.
* <p>
* Messages can be filtered out for multiple reasons:
* <ul>
* <li>Checksum or metadata corrupted
* <li>Message is an internal marker
* <li>Message is not meant to be delivered immediately
* </ul>
*
* @param entries
* a list of entries as read from storage
*
* @param batchSizes
* an array where the batch size for each entry (the number of messages within an entry) is stored. This
* array needs to be of at least the same size as the entries list
*
* @param sendMessageInfo
* an object where the total size in messages and bytes will be returned back to the caller
* @param subscription
* the subscription object
*/
public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo) {
int totalMessages = 0;
long totalBytes = 0;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
ByteBuf metadataAndPayload = entry.getDataBuffer();
PositionImpl pos = (PositionImpl) entry.getPosition();
MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);
try {
if (msgMetadata == null) {
// Message metadata was corrupted
entries.set(i, null);
entry.release();
subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,
Collections.emptyMap());
continue;
}
int batchSize = msgMetadata.getNumMessagesInBatch();
totalMessages += batchSize;
totalBytes += metadataAndPayload.readableBytes();
batchSizes.setBatchSize(i, batchSize);
} finally {
msgMetadata.recycle();
}
}
sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
}
}
......@@ -28,7 +28,7 @@ import com.carrotsearch.hppc.ObjectSet;
/**
*/
public abstract class AbstractDispatcherMultipleConsumers {
public abstract class AbstractDispatcherMultipleConsumers extends AbstractBaseDispatcher {
protected final CopyOnWriteArrayList<Consumer> consumerList = new CopyOnWriteArrayList<>();
protected final ObjectSet<Consumer> consumerSet = new ObjectHashSet<>();
......@@ -39,6 +39,11 @@ public abstract class AbstractDispatcherMultipleConsumers {
protected static final AtomicIntegerFieldUpdater<AbstractDispatcherMultipleConsumers> IS_CLOSED_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(AbstractDispatcherMultipleConsumers.class, "isClosed");
private volatile int isClosed = FALSE;
protected AbstractDispatcherMultipleConsumers(Subscription subscription) {
super(subscription);
}
public boolean isConsumerConnected() {
return !consumerList.isEmpty();
}
......@@ -127,7 +132,7 @@ public abstract class AbstractDispatcherMultipleConsumers {
/**
* Finds index of first available consumer which has higher priority then given targetPriority
*
*
* @param targetPriority
* @return -1 if couldn't find any available consumer
*/
......@@ -187,7 +192,7 @@ public abstract class AbstractDispatcherMultipleConsumers {
/**
* Finds index of first consumer in list which has same priority as given targetPriority
*
*
* @param targetPriority
* @return
*/
......
......@@ -33,7 +33,7 @@ import org.apache.pulsar.utils.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractDispatcherSingleActiveConsumer {
public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBaseDispatcher {
protected final String topicName;
protected static final AtomicReferenceFieldUpdater<AbstractDispatcherSingleActiveConsumer, Consumer> ACTIVE_CONSUMER_UPDATER =
......@@ -53,7 +53,8 @@ public abstract class AbstractDispatcherSingleActiveConsumer {
private volatile int isClosed = FALSE;
public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
String topicName) {
String topicName, Subscription subscription) {
super(subscription);
this.topicName = topicName;
this.consumers = new CopyOnWriteArrayList<>();
this.partitionIndex = partitionIndex;
......
......@@ -24,14 +24,13 @@ import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
......@@ -39,18 +38,14 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.stream.Collectors;
import lombok.Data;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
......@@ -108,10 +103,6 @@ public class Consumer {
private final Map<String, String> metadata;
public interface SendListener {
void sendComplete(ChannelFuture future, SendMessageInfo sendMessageInfo);
}
public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
int maxUnackedMessages, ServerCnx cnx, String appId,
......@@ -190,60 +181,53 @@ public class Consumer {
*
* @return a SendMessageInfo object that contains the detail of what was sent to consumer
*/
public SendMessageInfo sendMessages(final List<Entry> entries) {
// Empty listener
return sendMessages(entries, null);
}
/**
* Dispatch a list of entries to the consumer. <br/>
* <b>It is also responsible to release entries data and recycle entries object.</b>
*
* @return a SendMessageInfo object that contains the detail of what was sent to consumer
*/
public SendMessageInfo sendMessages(final List<Entry> entries, SendListener listener) {
public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes, int totalMessages,
long totalBytes, RedeliveryTracker redeliveryTracker) {
final ChannelHandlerContext ctx = cnx.ctx();
final SendMessageInfo sentMessages = new SendMessageInfo();
final ChannelPromise writePromise = listener != null ? ctx.newPromise() : ctx.voidPromise();
if (listener != null) {
writePromise.addListener(future -> listener.sendComplete(writePromise, sentMessages));
}
final ChannelPromise writePromise = ctx.newPromise();
if (entries.isEmpty()) {
if (entries.isEmpty() || totalMessages == 0) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] List of messages is empty, triggering write future immediately for consumerId {}",
topicName, subscription, consumerId);
}
writePromise.setSuccess();
sentMessages.totalSentMessages = 0;
sentMessages.totalSentMessageBytes = 0;
return sentMessages;
batchSizes.recyle();
return writePromise;
}
try {
updatePermitsAndPendingAcks(entries, sentMessages);
} catch (PulsarServerException pe) {
log.warn("[{}] [{}] consumer doesn't support batch-message {}", subscription, consumerId,
cnx.getRemoteEndpointProtocolVersion());
subscription.markTopicWithBatchMessagePublished();
sentMessages.totalSentMessages = 0;
sentMessages.totalSentMessageBytes = 0;
// disconnect consumer: it will update dispatcher's availablePermits and resend pendingAck-messages of this
// consumer to other consumer
disconnect();
return sentMessages;
}
// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
MESSAGE_PERMITS_UPDATER.addAndGet(this, -totalMessages);
incrementUnackedMessages(totalMessages);
msgOut.recordMultipleEvents(totalMessages, totalBytes);
ctx.channel().eventLoop().execute(() -> {
for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
PositionImpl pos = (PositionImpl) entry.getPosition();
if (entry == null) {
// Entry was filtered out
continue;
}
int batchSize = batchSizes.getBatchSize(i);
if (pendingAcks != null) {
pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, 0);
}
if (batchSize > 1 && !cnx.isBatchMessageCompatibleVersion()) {
log.warn("[{}-{}] Consumer doesn't support batch messages - consumerId {}, msg id {}-{}",
topicName, subscription,
consumerId, entry.getLedgerId(), entry.getEntryId());
ctx.close();
entry.release();
continue;
}
MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder();
MessageIdData messageId = messageIdBuilder
.setLedgerId(pos.getLedgerId())
.setEntryId(pos.getEntryId())
.setLedgerId(entry.getLedgerId())
.setEntryId(entry.getEntryId())
.setPartition(partitionIdx)
.build();
......@@ -257,25 +241,23 @@ public class Consumer {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Sending message to consumerId {}, msg id {}-{}", topicName, subscription,
consumerId, pos.getLedgerId(), pos.getEntryId());
consumerId, entry.getLedgerId(), entry.getEntryId());
}
// We only want to pass the "real" promise on the last entry written
ChannelPromise promise = ctx.voidPromise();
if (i == (entries.size() - 1)) {
promise = writePromise;
}
int redeliveryCount = subscription.getDispatcher().getRedeliveryTracker().getRedeliveryCount(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()));
ctx.write(Commands.newMessage(consumerId, messageId, redeliveryCount, metadataAndPayload), promise);
int redeliveryCount = redeliveryTracker
.getRedeliveryCount(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()));
ctx.write(Commands.newMessage(consumerId, messageId, redeliveryCount, metadataAndPayload), ctx.voidPromise());
messageId.recycle();
messageIdBuilder.recycle();
entry.release();
}
ctx.flush();
// Use an empty write here so that we can just tie the flush with the write promise for last entry
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise);
batchSizes.recyle();
});
return sentMessages;
return writePromise;
}
private void incrementUnackedMessages(int ackedMessages) {
......@@ -284,70 +266,6 @@ public class Consumer {
}
}
public static int getBatchSizeforEntry(ByteBuf metadataAndPayload, Subscription subscription, long consumerId) {
try {
// save the reader index and restore after parsing
metadataAndPayload.markReaderIndex();
PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.resetReaderIndex();
int batchSize = metadata.getNumMessagesInBatch();
metadata.recycle();
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] num messages in batch are {} ", subscription, consumerId, batchSize);
}
return batchSize;
} catch (Throwable t) {
log.error("[{}] [{}] Failed to parse message metadata", subscription, consumerId, t);
}
return -1;
}
void updatePermitsAndPendingAcks(final List<Entry> entries, SendMessageInfo sentMessages) throws PulsarServerException {
int permitsToReduce = 0;
Iterator<Entry> iter = entries.iterator();
boolean unsupportedVersion = false;
long totalReadableBytes = 0;
boolean clientSupportBatchMessages = cnx.isBatchMessageCompatibleVersion();
while (iter.hasNext()) {
Entry entry = iter.next();
ByteBuf metadataAndPayload = entry.getDataBuffer();
int batchSize = getBatchSizeforEntry(metadataAndPayload, subscription, consumerId);
if (batchSize == -1) {
// this would suggest that the message might have been corrupted
iter.remove();
PositionImpl pos = (PositionImpl) entry.getPosition();
entry.release();
subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual, Collections.emptyMap());
continue;
}
if (pendingAcks != null) {
pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, 0);
}
// check if consumer supports batch message
if (batchSize > 1 && !clientSupportBatchMessages) {
unsupportedVersion = true;
}
totalReadableBytes += metadataAndPayload.readableBytes();
permitsToReduce += batchSize;
}
// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
int permits = MESSAGE_PERMITS_UPDATER.addAndGet(this, -permitsToReduce);
incrementUnackedMessages(permitsToReduce);
if (unsupportedVersion) {
throw new PulsarServerException("Consumer does not support batch-message");
}
if (permits < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] [{}] message permits dropped below 0 - {}", topicName, subscription, consumerId,
permits);
}
}
msgOut.recordMultipleEvents(permitsToReduce, totalReadableBytes);
sentMessages.totalSentMessages = permitsToReduce;
sentMessages.totalSentMessageBytes = totalReadableBytes;
}
public boolean isWritable() {
return cnx.isWritable();
}
......@@ -680,26 +598,5 @@ public class Consumer {
subscription.addUnAckedMessages(-unaAckedMsgs);
}
public static final class SendMessageInfo {
private int totalSentMessages;
private long totalSentMessageBytes;
public int getTotalSentMessages() {
return totalSentMessages;
}
public void setTotalSentMessages(int totalSentMessages) {
this.totalSentMessages = totalSentMessages;
}
public long getTotalSentMessageBytes() {
return totalSentMessageBytes;
}
public void setTotalSentMessageBytes(long totalSentMessageBytes) {
this.totalSentMessageBytes = totalSentMessageBytes;
}
}
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import io.netty.util.Recycler;
public class EntryBatchSizes {
private int[] sizes = new int[100];
public int getBatchSize(int entryIdx) {
return sizes[entryIdx];
}
public int setBatchSize(int entryIdx, int batchSize) {
return sizes[entryIdx] = batchSize;
}
public void recyle() {
handle.recycle(this);
}
public static EntryBatchSizes get(int entriesListSize) {
EntryBatchSizes ebs = RECYCLER.get();
if (ebs.sizes.length < entriesListSize) {
ebs.sizes = new int[entriesListSize];
}
return ebs;
}
private EntryBatchSizes(Recycler.Handle<EntryBatchSizes> handle) {
this.handle = handle;
}
private final Recycler.Handle<EntryBatchSizes> handle;
private static final Recycler<EntryBatchSizes> RECYCLER = new Recycler<EntryBatchSizes>() {
@Override
protected EntryBatchSizes newObject(Handle<EntryBatchSizes> handle) {
return new EntryBatchSizes(handle);
}
};
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import io.netty.util.concurrent.FastThreadLocal;
import lombok.Data;
@Data
public class SendMessageInfo {
private int totalMessages;
private long totalBytes;
private SendMessageInfo() {
// Private constructor so that all usages are through the thread-local instance
}
public static SendMessageInfo getThreadLocal() {
SendMessageInfo smi = THREAD_LOCAL.get();
smi.totalMessages = 0;
smi.totalBytes = 0;
return smi;
}
private static final FastThreadLocal<SendMessageInfo> THREAD_LOCAL = new FastThreadLocal<SendMessageInfo>() {
protected SendMessageInfo initialValue() throws Exception {
return new SendMessageInfo();
};
};
}
......@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;
import static org.apache.pulsar.broker.service.Consumer.getBatchSizeforEntry;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
......@@ -31,9 +29,12 @@ import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
import org.slf4j.Logger;
......@@ -59,6 +60,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
private final RedeliveryTracker redeliveryTracker;
public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) {
super(subscription);
this.topic = topic;
this.subscription = subscription;
this.name = topic.getName() + " / " + subscription.getName();
......@@ -191,10 +193,16 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
public void sendMessages(List<Entry> entries) {
Consumer consumer = TOTAL_AVAILABLE_PERMITS_UPDATER.get(this) > 0 ? getNextConsumer() : null;
if (consumer != null) {
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -consumer.sendMessages(entries).getTotalSentMessages());
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
filterEntriesForConsumer(entries, batchSizes, sendMessageInfo);
consumer.sendMessages(entries, batchSizes, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
} else {
entries.forEach(entry -> {
int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), subscription, -1);
int totalMsgs = Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(), subscription.toString(), -1);
if (totalMsgs > 0) {
msgDrop.recordEvent(totalMsgs);
}
......
......@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service.nonpersistent;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.service.Consumer.getBatchSizeforEntry;
import java.util.List;
......@@ -29,9 +28,12 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
......@@ -43,10 +45,10 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
private final Subscription subscription;
private final ServiceConfiguration serviceConfig;
private final RedeliveryTracker redeliveryTracker;
public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
NonPersistentTopic topic, Subscription subscription) {
super(subscriptionType, partitionIndex, topic.getName());
super(subscriptionType, partitionIndex, topic.getName(), subscription);
this.topic = topic;
this.subscription = subscription;
this.msgDrop = new Rate();
......@@ -58,10 +60,14 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
public void sendMessages(List<Entry> entries) {
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (currentConsumer != null && currentConsumer.getAvailablePermits() > 0 && currentConsumer.isWritable()) {
currentConsumer.sendMessages(entries);
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
filterEntriesForConsumer(entries, batchSizes, sendMessageInfo);
currentConsumer.sendMessages(entries, batchSizes, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
} else {
entries.forEach(entry -> {
int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), subscription, -1);
int totalMsgs = Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(), subscription.toString(), -1);
if (totalMsgs > 0) {
msgDrop.recordEvent(totalMsgs);
}
......
......@@ -21,6 +21,9 @@ package org.apache.pulsar.broker.service.persistent;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Optional;
import java.util.Set;
......@@ -42,11 +45,13 @@ import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
......@@ -59,9 +64,6 @@ import org.apache.pulsar.utils.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
/**
*/
public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers implements Dispatcher, ReadEntriesCallback {
......@@ -71,7 +73,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu
private CompletableFuture<Void> closeFuture = null;
LongPairSet messagesToReplay = new ConcurrentSortedLongPairSet(128, 2);
private final RedeliveryTracker redeliveryTracker;
protected final RedeliveryTracker redeliveryTracker;
private boolean havePendingRead = false;
private boolean havePendingReplayRead = false;
......@@ -95,7 +97,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu
Normal, Replay
}
public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor) {
public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription) {
super(subscription);
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
this.cursor = cursor;
this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
......@@ -440,14 +443,21 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu
});
}
SendMessageInfo sentMsgInfo = c.sendMessages(entries.subList(start, start + messagesForC));
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
List<Entry> entriesForThisConsumer = entries.subList(start, start + messagesForC);
EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
filterEntriesForConsumer(entriesForThisConsumer, batchSizes, sendMessageInfo);
c.sendMessages(entriesForThisConsumer, batchSizes, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), redeliveryTracker);
long msgSent = sentMsgInfo.getTotalSentMessages();
long msgSent = sendMessageInfo.getTotalMessages();
start += messagesForC;
entriesToDispatch -= messagesForC;
totalAvailablePermits -= msgSent;
totalMessagesSent += sentMsgInfo.getTotalSentMessages();
totalBytesSent += sentMsgInfo.getTotalSentMessageBytes();
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
}
}
......
......@@ -40,8 +40,11 @@ import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
......@@ -68,8 +71,8 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
private final RedeliveryTracker redeliveryTracker;
public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
PersistentTopic topic) {
super(subscriptionType, partitionIndex, topic.getName());
PersistentTopic topic, Subscription subscription) {
super(subscriptionType, partitionIndex, topic.getName(), subscription);
this.topic = topic;
this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName())
: ""/* NonDurableCursor doesn't have name */);
......@@ -208,38 +211,48 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
readMoreEntries(currentConsumer);
}
} else {
currentConsumer.sendMessages(entries, (future, sentMsgInfo) -> {
if (future.isSuccess()) {
// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(sentMsgInfo.getTotalSentMessages(),
sentMsgInfo.getTotalSentMessageBytes());
}
if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(sentMsgInfo.getTotalSentMessages(),
sentMsgInfo.getTotalSentMessageBytes());
}
}
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
filterEntriesForConsumer(entries, batchSizes, sendMessageInfo);
int totalMessages = sendMessageInfo.getTotalMessages();
long totalBytes = sendMessageInfo.getTotalBytes();
currentConsumer
.sendMessages(entries, batchSizes, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), redeliveryTracker)
.addListener(future -> {
if (future.isSuccess()) {
// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessages, totalBytes);
}
// Schedule a new read batch operation only after the previous batch has been written to the socket
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer newConsumer = getActiveConsumer();
if (newConsumer != null && !havePendingRead) {
readMoreEntries(newConsumer);
} else {
if (log.isDebugEnabled()) {
log.debug(
"[{}-{}] Ignoring write future complete. consumerAvailable={} havePendingRead={}",
name, newConsumer, newConsumer != null, havePendingRead);
if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(totalMessages, totalBytes);
}
}
// Schedule a new read batch operation only after the previous batch has been written to the
// socket
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
SafeRun.safeRun(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer newConsumer = getActiveConsumer();
if (newConsumer != null && !havePendingRead) {
readMoreEntries(newConsumer);
} else {
if (log.isDebugEnabled()) {
log.debug(
"[{}-{}] Ignoring write future complete. consumerAvailable={} havePendingRead={}",
name, newConsumer, newConsumer != null, havePendingRead);
}
}
}
}));
}
}));
}
});
});
}
}
......
......@@ -19,34 +19,37 @@
package org.apache.pulsar.broker.service.persistent;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDispatcherMultipleConsumers {
public static final String NONE_KEY = "NONE_KEY";
private final StickyKeyConsumerSelector selector;
PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor) {
super(topic, cursor);
PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription) {
super(topic, cursor, subscription);
//TODO: Consumer selector Pluggable
selector = new HashRangeStickyKeyConsumerSelector();
}
......@@ -94,12 +97,18 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
if (readType == ReadType.Replay) {
subList.forEach(entry -> messagesToReplay.remove(entry.getLedgerId(), entry.getEntryId()));
}
final SendMessageInfo sentMsgInfo = consumer.sendMessages(subList);
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(subList.size());
filterEntriesForConsumer(subList, batchSizes, sendMessageInfo);
consumer.sendMessages(subList, batchSizes, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
entriesWithSameKey.getValue().removeAll(subList);
final long msgSent = sentMsgInfo.getTotalSentMessages();
totalAvailablePermits -= msgSent;
totalMessagesSent += sentMsgInfo.getTotalSentMessages();
totalBytesSent += sentMsgInfo.getTotalSentMessageBytes();
totalAvailablePermits -= sendMessageInfo.getTotalMessages();
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
if (entriesWithSameKey.getValue().size() == 0) {
iterator.remove();
......
......@@ -140,12 +140,12 @@ public class PersistentSubscription implements Subscription {
switch (consumer.subType()) {
case Exclusive:
if (dispatcher == null || dispatcher.getType() != SubType.Exclusive) {
dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Exclusive, 0, topic);
dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Exclusive, 0, topic, this);
}
break;
case Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Shared) {
dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor);
dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, this);
}
break;
case Failover:
......@@ -157,12 +157,12 @@ public class PersistentSubscription implements Subscription {
if (dispatcher == null || dispatcher.getType() != SubType.Failover) {
dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover, partitionIndex,
topic);
topic, this);
}
break;
case Key_Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor);
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this);
}
break;
default:
......
......@@ -267,7 +267,7 @@ public class PersistentDispatcherFailoverConsumerTest {
int partitionIndex = 0;
PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock,
SubType.Failover, partitionIndex, topic);
SubType.Failover, partitionIndex, topic, sub);
// 1. Verify no consumers connected
assertFalse(pdfc.isConsumerConnected());
......@@ -306,7 +306,7 @@ public class PersistentDispatcherFailoverConsumerTest {
int partitionIndex = 0;
PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock,
SubType.Failover, partitionIndex, topic);
SubType.Failover, partitionIndex, topic, sub);
// 1. Verify no consumers connected
assertFalse(pdfc.isConsumerConnected());
......@@ -421,7 +421,7 @@ public class PersistentDispatcherFailoverConsumerTest {
public void testMultipleDispatcherGetNextConsumerWithDifferentPriorityLevel() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
Consumer consumer1 = createConsumer(0, 2, false, 1);
Consumer consumer2 = createConsumer(0, 2, false, 2);
Consumer consumer3 = createConsumer(0, 2, false, 3);
......@@ -465,7 +465,7 @@ public class PersistentDispatcherFailoverConsumerTest {
@Test
public void testFewBlockedConsumerSamePriority() throws Exception{
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
Consumer consumer1 = createConsumer(0, 2, false, 1);
Consumer consumer2 = createConsumer(0, 2, false, 2);
Consumer consumer3 = createConsumer(0, 2, false, 3);
......@@ -492,7 +492,7 @@ public class PersistentDispatcherFailoverConsumerTest {
@Test
public void testFewBlockedConsumerDifferentPriority() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
Consumer consumer1 = createConsumer(0, 2, false, 1);
Consumer consumer2 = createConsumer(0, 2, false, 2);
Consumer consumer3 = createConsumer(0, 2, false, 3);
......@@ -546,7 +546,7 @@ public class PersistentDispatcherFailoverConsumerTest {
@Test
public void testFewBlockedConsumerDifferentPriority2() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
Consumer consumer1 = createConsumer(0, 2, true, 1);
Consumer consumer2 = createConsumer(0, 2, true, 2);
Consumer consumer3 = createConsumer(0, 2, true, 3);
......
......@@ -285,7 +285,7 @@ public class PersistentTopicTest {
PersistentTopic topic = spy(new PersistentTopic(successTopicName, ledgerMock, brokerService));
ManagedCursor cursor = mock(ManagedCursor.class);
when(cursor.getName()).thenReturn("cursor");
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, null);
dispatcher.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null);
verify(topic, atLeast(1)).getBrokerService();
}
......@@ -296,7 +296,7 @@ public class PersistentTopicTest {
ManagedCursor cursor = mock(ManagedCursor.class);
when(cursor.getName()).thenReturn("cursor");
PersistentDispatcherSingleActiveConsumer dispatcher = new PersistentDispatcherSingleActiveConsumer(cursor,
SubType.Exclusive, 1, topic);
SubType.Exclusive, 1, topic, null);
Consumer consumer = mock(Consumer.class);
dispatcher.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), consumer);
verify(topic, atLeast(1)).getBrokerService();
......
......@@ -37,6 +37,9 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod;
......@@ -92,6 +95,8 @@ import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
@UtilityClass
@Slf4j
public class Commands {
// default message size for transfer
......@@ -1214,6 +1219,33 @@ public class Commands {
return (ByteBufPair) ByteBufPair.get(headers, metadataAndPayload);
}
public static int getNumberOfMessagesInBatch(ByteBuf metadataAndPayload, String subscription,
long consumerId) {
MessageMetadata msgMetadata = peekMessageMetadata(metadataAndPayload, subscription, consumerId);
if (msgMetadata == null) {
return -1;
} else {
int numMessagesInBatch = msgMetadata.getNumMessagesInBatch();
msgMetadata.recycle();
return numMessagesInBatch;
}
}
public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, String subscription,
long consumerId) {
try {
// save the reader index and restore after parsing
int readerIdx = metadataAndPayload.readerIndex();
PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.readerIndex(readerIdx);
return metadata;
} catch (Throwable t) {
log.error("[{}] [{}] Failed to parse message metadata", subscription, consumerId, t);
return null;
}
}
public static int getCurrentProtocolVersion() {
// Return the last ProtocolVersion enum value
return ProtocolVersion.values()[ProtocolVersion.values().length - 1].getNumber();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册