提交 df93a426 编写于 作者: R Rajan Dhabalia 提交者: GitHub

add msgAck and redelivery debug-logs And add topic-name in debug-logs (#714)

* Add msgAck and redelivery debug-logs And add topic-name in debug-logs
上级 3751bafd
......@@ -65,6 +65,7 @@ public class Consumer {
private final SubType subType;
private final ServerCnx cnx;
private final String appId;
private final String topicName;
private final long consumerId;
private final int priorityLevel;
......@@ -95,11 +96,12 @@ public class Consumer {
private volatile int unackedMessages = 0;
private volatile boolean blockedConsumerOnUnackedMsgs = false;
public Consumer(Subscription subscription, SubType subType, long consumerId, int priorityLevel, String consumerName,
public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName,
int maxUnackedMessages, ServerCnx cnx, String appId) throws BrokerServiceException {
this.subscription = subscription;
this.subType = subType;
this.topicName = topicName;
this.consumerId = consumerId;
this.priorityLevel = priorityLevel;
this.consumerName = consumerName;
......@@ -151,8 +153,8 @@ public class Consumer {
sentMessages.channelPromse = writePromise;
if (entries.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("[{}] List of messages is empty, triggering write future immediately for consumerId {}",
subscription, consumerId);
log.debug("[{}-{}] List of messages is empty, triggering write future immediately for consumerId {}",
topicName, subscription, consumerId);
}
writePromise.setSuccess();
sentMessages.totalSentMessages = 0;
......@@ -192,8 +194,8 @@ public class Consumer {
}
if (log.isDebugEnabled()) {
log.debug("[{}] Sending message to consumerId {}, entry id {}", subscription, consumerId,
pos.getEntryId());
log.debug("[{}-{}] Sending message to consumerId {}, entry id {}", topicName, subscription,
consumerId, pos.getEntryId());
}
// We only want to pass the "real" promise on the last entry written
......@@ -273,7 +275,8 @@ public class Consumer {
}
if (permits < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] message permits dropped below 0 - {}", subscription, consumerId, permits);
log.debug("[{}-{}] [{}] message permits dropped below 0 - {}", topicName, subscription, consumerId,
permits);
}
}
......@@ -363,8 +366,8 @@ public class Consumer {
}
if (log.isDebugEnabled()) {
log.debug("[{}] Added more flow control message permits {} (old was: {})", this, additionalNumberOfMessages,
oldPermits);
log.debug("[{}-{}] Added more flow control message permits {} (old was: {}), blocked = ", topicName,
subscription, additionalNumberOfMessages, oldPermits, blockedConsumerOnUnackedMsgs);
}
}
......@@ -497,6 +500,9 @@ public class Consumer {
if (ackOwnedConsumer != null) {
int totalAckedMsgs = (int) ackOwnedConsumer.getPendingAcks().get(position.getLedgerId(), position.getEntryId()).first;
ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId());
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
}
// unblock consumer-throttling when receives half of maxUnackedMessages => consumer can start again
// consuming messages
if (((addAndGetUnAckedMsgs(ackOwnedConsumer, -totalAckedMsgs) <= (maxUnackedMessages / 2))
......@@ -520,6 +526,9 @@ public class Consumer {
// cleanup unackedMessage bucket and redeliver those unack-msgs again
clearUnAckedMsgs(this);
blockedConsumerOnUnackedMsgs = false;
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received redelivery", topicName, subscription, consumerId);
}
// redeliver unacked-msgs
subscription.redeliverUnacknowledgedMessages(this);
flowConsumerBlockedPermits(this);
......@@ -550,6 +559,11 @@ public class Consumer {
addAndGetUnAckedMsgs(this, -totalRedeliveryMessages);
blockedConsumerOnUnackedMsgs = false;
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received {} msg-redelivery {}", topicName, subscription, consumerId,
totalRedeliveryMessages, pendingPositions.size());
}
subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);
......
......@@ -625,7 +625,12 @@ public class ServerCnx extends PulsarHandler {
CompletableFuture<Consumer> consumerFuture = consumers.get(flow.getConsumerId());
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
consumerFuture.getNow(null).flowPermits(flow.getMessagePermits());
Consumer consumer = consumerFuture.getNow(null);
if (consumer != null) {
consumer.flowPermits(flow.getMessagePermits());
} else {
log.info("[{}] Couldn't find consumer {}", remoteAddress, flow.getConsumerId());
}
}
}
......
......@@ -321,7 +321,7 @@ public class NonPersistentTopic implements Topic {
name -> new NonPersistentSubscription(this, subscriptionName));
try {
Consumer consumer = new Consumer(subscription, subType, consumerId, priorityLevel, consumerName, 0, cnx,
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, cnx,
cnx.getRole());
subscription.addConsumer(consumer);
if (!cnx.isActive()) {
......
......@@ -163,7 +163,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu
totalAvailablePermits += additionalNumberOfMessages;
if (log.isDebugEnabled()) {
log.debug("[{}] Trigger new read after receiving flow control message", consumer);
log.debug("[{}-{}] Trigger new read after receiving flow control message with permits {}", name, consumer,
totalAvailablePermits);
}
readMoreEntries();
}
......@@ -331,6 +332,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu
Consumer c = getNextConsumer();
if (c == null) {
// Do nothing, cursor will be rewind at reconnection
log.info("[{}] rewind because no available consumer found from total {}", name, consumerList.size());
entries.subList(start, entries.size()).forEach(Entry::release);
cursor.rewind();
return;
......@@ -460,7 +462,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu
messagesToReplay.add(ledgerId, entryId);
});
if (log.isDebugEnabled()) {
log.debug("[{}] Redelivering unacknowledged messages for consumer {}", consumer, messagesToReplay);
log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, messagesToReplay);
}
readMoreEntries();
}
......@@ -469,7 +471,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
positions.forEach(position -> messagesToReplay.add(position.getLedgerId(), position.getEntryId()));
if (log.isDebugEnabled()) {
log.debug("[{}] Redelivering unacknowledged messages for consumer {}", consumer, positions);
log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, positions);
}
readMoreEntries();
}
......
......@@ -37,6 +37,7 @@ import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -44,6 +45,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
private final PersistentTopic topic;
private final ManagedCursor cursor;
private final String name;
private boolean havePendingRead = false;
......@@ -55,6 +57,8 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
PersistentTopic topic) {
super(subscriptionType, partitionIndex, topic.getName());
this.topic = topic;
this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName())
: ""/* NonDurableCursor doesn't have name */);
this.cursor = cursor;
this.readBatchSize = MaxReadBatchSize;
}
......@@ -82,7 +86,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
public synchronized void readEntriesComplete(final List<Entry> entries, Object obj) {
Consumer readConsumer = (Consumer) obj;
if (log.isDebugEnabled()) {
log.debug("[{}] Got messages: {}", readConsumer, entries.size());
log.debug("[{}-{}] Got messages: {}", name, readConsumer, entries.size());
}
havePendingRead = false;
......@@ -90,7 +94,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
if (readBatchSize < MaxReadBatchSize) {
int newReadBatchSize = Math.min(readBatchSize * 2, MaxReadBatchSize);
if (log.isDebugEnabled()) {
log.debug("[{}] Increasing read batch size from {} to {}", readConsumer, readBatchSize,
log.debug("[{}-{}] Increasing read batch size from {} to {}", name, readConsumer, readBatchSize,
newReadBatchSize);
}
......@@ -103,6 +107,9 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
if (currentConsumer == null || readConsumer != currentConsumer) {
// Active consumer has changed since the read request has been issued. We need to rewind the cursor and
// re-issue the read request for the new consumer
if (log.isDebugEnabled()) {
log.debug("[{}] rewind because no available consumer found", name);
}
entries.forEach(Entry::release);
cursor.rewind();
if (currentConsumer != null) {
......@@ -124,8 +131,8 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
} else {
if (log.isDebugEnabled()) {
log.debug(
"[{}] Ignoring write future complete. consumerAvailable={} havePendingRead={}",
newConsumer, newConsumer != null, havePendingRead);
"[{}-{}] Ignoring write future complete. consumerAvailable={} havePendingRead={}",
name, newConsumer, newConsumer != null, havePendingRead);
}
}
}
......@@ -139,18 +146,18 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
if (!havePendingRead) {
if (ACTIVE_CONSUMER_UPDATER.get(this) == consumer) {
if (log.isDebugEnabled()) {
log.debug("[{}] Trigger new read after receiving flow control message", consumer);
log.debug("[{}-{}] Trigger new read after receiving flow control message", name, consumer);
}
readMoreEntries(consumer);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Ignoring flow control message since consumer is not active partition consumer",
consumer);
log.debug("[{}-{}] Ignoring flow control message since consumer is not active partition consumer",
name, consumer);
}
}
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Ignoring flow control message since we already have a pending read req", consumer);
log.debug("[{}-{}] Ignoring flow control message since we already have a pending read req", name, consumer);
}
}
}
......@@ -168,11 +175,12 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
if (!havePendingRead) {
cursor.rewind();
if (log.isDebugEnabled()) {
log.debug("[{}] Cursor rewinded, redelivering unacknowledged messages. ", consumer);
log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged messages. ", name, consumer);
}
readMoreEntries(consumer);
} else {
log.info("[{}] Ignoring reDeliverUnAcknowledgedMessages: cancelPendingRequest on cursor failed", consumer);
log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: cancelPendingRequest on cursor failed", name,
consumer);
}
}
......@@ -207,7 +215,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
if (!rateLimiter.hasMessageDispatchPermit()) {
if (log.isDebugEnabled()) {
log.debug("[{}] message-read exceeded message-rate {}/{}, schedule after a {}",
topic.getName(), rateLimiter.getDispatchRateOnMsg(), rateLimiter.getDispatchRateOnByte(),
name, rateLimiter.getDispatchRateOnMsg(), rateLimiter.getDispatchRateOnByte(),
MESSAGE_RATE_BACKOFF_MS);
}
topic.getBrokerService().executor().schedule(() -> {
......@@ -235,13 +243,13 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
// Schedule read
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages", consumer, messagesToRead);
log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
}
havePendingRead = true;
cursor.asyncReadEntriesOrWait(messagesToRead, this, consumer);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer buffer is full, pause reading", consumer);
log.debug("[{}-{}] Consumer buffer is full, pause reading", name, consumer);
}
}
}
......@@ -261,12 +269,12 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
consumers.forEach(Consumer::reachedEndOfTopic);
}
} else if (!(exception instanceof TooManyRequestsException)) {
log.error("[{}] Error reading entries at {} : {} - Retrying to read in {} seconds", c,
log.error("[{}-{}] Error reading entries at {} : {} - Retrying to read in {} seconds", name, c,
cursor.getReadPosition(), exception.getMessage(), waitTimeMillis / 1000.0);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Got throttled by bookies while reading at {} : {} - Retrying to read in {} seconds", c,
cursor.getReadPosition(), exception.getMessage(), waitTimeMillis / 1000.0);
log.debug("[{}-{}] Got throttled by bookies while reading at {} : {} - Retrying to read in {} seconds",
name, c, cursor.getReadPosition(), exception.getMessage(), waitTimeMillis / 1000.0);
}
}
......@@ -281,12 +289,12 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
// we should retry the read if we have an active consumer and there is no pending read
if (currentConsumer != null && !havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Retrying read operation", c);
log.debug("[{}-{}] Retrying read operation", name, c);
}
readMoreEntries(currentConsumer);
} else {
log.info("[{}] Skipping read retry: Current Consumer {}, havePendingRead {}", c, currentConsumer,
havePendingRead);
log.info("[{}-{}] Skipping read retry: Current Consumer {}, havePendingRead {}", name, c,
currentConsumer, havePendingRead);
}
}
}, waitTimeMillis, TimeUnit.MILLISECONDS);
......
......@@ -423,7 +423,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Throttled by bookies while reading at {}. Retrying to read in {}s. ({})",
topic, localCluster, remoteCluster, ctx, waitTimeMillis / 1000.0, exception.getMessage(),
topicName, localCluster, remoteCluster, ctx, waitTimeMillis / 1000.0, exception.getMessage(),
exception);
}
}
......
......@@ -404,7 +404,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
subscriptionFuture.thenAccept(subscription -> {
try {
Consumer consumer = new Consumer(subscription, subType, consumerId, priorityLevel, consumerName,
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
maxUnackedMessages, cnx, cnx.getRole());
subscription.addConsumer(consumer);
if (!cnx.isActive()) {
......
......@@ -207,8 +207,8 @@ public class PersistentDispatcherFailoverConsumerTest {
assertFalse(pdfc.isConsumerConnected());
// 2. Add consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 1 /* consumer id */, 0, "Cons1"/* consumer name */,
50000, serverCnx, "myrole-1");
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0,
"Cons1"/* consumer name */, 50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertTrue(consumers.get(0).consumerName() == consumer1.consumerName());
......@@ -224,7 +224,7 @@ public class PersistentDispatcherFailoverConsumerTest {
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
// 5. Add another consumer which does not change active consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 2 /* consumer id */, 0, "Cons2"/* consumer name */,
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
......@@ -232,8 +232,8 @@ public class PersistentDispatcherFailoverConsumerTest {
assertEquals(3, consumers.size());
// 6. Add a consumer which changes active consumer
Consumer consumer0 = new Consumer(sub, SubType.Exclusive, 0 /* consumer id */, 0, "Cons0"/* consumer name */,
50000, serverCnx, "myrole-1");
Consumer consumer0 = new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0,
"Cons0"/* consumer name */, 50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer0);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName());
......@@ -440,7 +440,7 @@ public class PersistentDispatcherFailoverConsumerTest {
}
private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception {
Consumer consumer = new Consumer(null, SubType.Shared, id, priority, ""+id, 5000, serverCnx, "appId");
Consumer consumer = new Consumer(null, SubType.Shared, null, id, priority, ""+id, 5000, serverCnx, "appId");
try {
consumer.flowPermits(permit);
} catch (Exception e) {
......
......@@ -409,7 +409,7 @@ public class PersistentTopicTest {
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
// 1. simple add consumer
Consumer consumer = new Consumer(sub, SubType.Exclusive, 1 /* consumer id */, 0, "Cons1"/* consumer name */,
Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
50000, serverCnx, "myrole-1");
sub.addConsumer(consumer);
assertTrue(sub.getDispatcher().isConsumerConnected());
......@@ -439,7 +439,7 @@ public class PersistentTopicTest {
public void testUbsubscribeRaceConditions() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 1 /* consumer id */, 0, "Cons1"/* consumer name */,
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
50000, serverCnx, "myrole-1");
sub.addConsumer(consumer1);
......@@ -461,7 +461,7 @@ public class PersistentTopicTest {
try {
Thread.sleep(10); /* delay to ensure that the ubsubscribe gets executed first */
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 2 /* consumer id */, 0, "Cons2"/* consumer name */,
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
50000, serverCnx, "myrole-1");
} catch (BrokerServiceException e) {
assertTrue(e instanceof BrokerServiceException.SubscriptionFencedException);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册