提交 bea1087f 编写于 作者: R Raj 提交者: Matteo Merli

Fix msg-acknowledge counter at client-stats (#36)

上级 d7d51632
......@@ -71,6 +71,8 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
// Cumulative Ack-counter works if ackTimeOutTimer-task is enabled
boolean isAckTimeoutTaskEnabledForCumulativeAck = ackTimeoutSec > 0;
if (ackTimeoutSec > 0) {
conf.setAckTimeout(ackTimeoutSec, TimeUnit.SECONDS);
}
......@@ -107,7 +109,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
Thread.sleep(2000);
consumer.close();
producer.close();
validatingLogInfo(consumer, producer);
validatingLogInfo(consumer, producer, isAckTimeoutTaskEnabledForCumulativeAck);
log.info("-- Exiting {} test --", methodName);
}
......@@ -115,6 +117,8 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, int ackTimeoutSec) throws Exception {
log.info("-- Starting {} test --", methodName);
ConsumerConfiguration conf = new ConsumerConfiguration();
// Cumulative Ack-counter works if ackTimeOutTimer-task is enabled
boolean isAckTimeoutTaskEnabledForCumulativeAck = ackTimeoutSec > 0;
conf.setSubscriptionType(SubscriptionType.Exclusive);
if (ackTimeoutSec > 0) {
conf.setAckTimeout(ackTimeoutSec, TimeUnit.SECONDS);
......@@ -165,7 +169,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
Thread.sleep(2000);
consumer.close();
producer.close();
validatingLogInfo(consumer, producer);
validatingLogInfo(consumer, producer, isAckTimeoutTaskEnabledForCumulativeAck);
log.info("-- Exiting {} test --", methodName);
}
......@@ -174,6 +178,8 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
throws Exception {
log.info("-- Starting {} test --", methodName);
ConsumerConfiguration conf = new ConsumerConfiguration();
// Cumulative Ack-counter works if ackTimeOutTimer-task is enabled
boolean isAckTimeoutTaskEnabledForCumulativeAck = ackTimeoutSec > 0;
conf.setSubscriptionType(SubscriptionType.Exclusive);
if (ackTimeoutSec > 0) {
conf.setAckTimeout(ackTimeoutSec, TimeUnit.SECONDS);
......@@ -224,7 +230,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
Thread.sleep(5000);
consumer.close();
producer.close();
validatingLogInfo(consumer, producer);
validatingLogInfo(consumer, producer, isAckTimeoutTaskEnabledForCumulativeAck);
log.info("-- Exiting {} test --", methodName);
}
......@@ -232,6 +238,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
public void testMessageListener(int batchMessageDelayMs) throws Exception {
log.info("-- Starting {} test --", methodName);
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setAckTimeout(100, TimeUnit.SECONDS);
conf.setSubscriptionType(SubscriptionType.Exclusive);
int numMessages = 100;
......@@ -272,7 +279,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
assertEquals(latch.await(numMessages, TimeUnit.SECONDS), true, "Timed out waiting for message listener acks");
consumer.close();
producer.close();
validatingLogInfo(consumer, producer);
validatingLogInfo(consumer, producer, true);
log.info("-- Exiting {} test --", methodName);
}
......@@ -324,7 +331,7 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
public void validatingLogInfo(Consumer consumer, Producer producer) throws InterruptedException {
public void validatingLogInfo(Consumer consumer, Producer producer, boolean verifyAckCount) throws InterruptedException {
// Waiting for recording last stat info
Thread.sleep(1000);
ConsumerStats cStat = consumer.getStats();
......@@ -332,7 +339,9 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase {
assertEquals(pStat.getTotalMsgsSent(), cStat.getTotalMsgsReceived());
assertEquals(pStat.getTotalBytesSent(), cStat.getTotalBytesReceived());
assertEquals(pStat.getTotalMsgsSent(), pStat.getTotalAcksReceived());
assertEquals(cStat.getTotalMsgsReceived(), cStat.getTotalAcksSent());
if (verifyAckCount) {
assertEquals(cStat.getTotalMsgsReceived(), cStat.getTotalAcksSent());
}
}
}
......@@ -23,6 +23,8 @@ import java.io.IOException;
import java.util.BitSet;
import java.util.NavigableMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
......@@ -287,6 +289,7 @@ public class ConsumerImpl extends ConsumerBase {
return true;
}
int batchIndex = batchMessageId.getBatchIndex();
int batchSize = bitSet.length();
if (ackType == AckType.Individual) {
bitSet.clear(batchIndex);
} else {
......@@ -303,6 +306,11 @@ public class ConsumerImpl extends ConsumerBase {
batchMessageAckTracker.keySet().removeIf(m -> (m.compareTo(message) <= 0));
}
batchMessageAckTracker.remove(message);
// increment Acknowledge-msg counter with number of messages in batch only if AckType is Individual.
// CumulativeAckType is handled while sending ack to broker
if (ackType == AckType.Individual) {
stats.incrementNumAcksSent(batchSize);
}
return true;
} else {
// we cannot ack this message to broker. but prior message may be ackable
......@@ -396,16 +404,17 @@ public class ConsumerImpl extends ConsumerBase {
if (unAckedMessageTracker != null) {
unAckedMessageTracker.remove(msgId);
}
stats.incrementNumAcksSent(stats.getNumAcksTrackerSumThenReset());
// increment counter by 1 for non-batch msg
if (!(messageId instanceof BatchMessageIdImpl)) {
stats.incrementNumAcksSent(1);
}
} else if (ackType == AckType.Cumulative) {
if (unAckedMessageTracker != null) {
stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(msgId));
} else {
stats.incrementNumAcksSent(stats.getNumAcksTrackerSumThenReset());
int ackedMessages = unAckedMessageTracker.removeMessagesTill(msgId);
stats.incrementNumAcksSent(ackedMessages);
}
}
ackFuture.complete(null);
stats.resetNumAckTracker();
} else {
stats.incrementNumAcksFailed();
ackFuture.completeExceptionally(new PulsarClientException(future.cause()));
......@@ -638,8 +647,6 @@ public class ConsumerImpl extends ConsumerBase {
msgMetadata.recycle();
}
stats.incrementNumAcksTracker(numMessages);
if (listener != null) {
// Trigger the notification on the message listener in a separate thread to avoid blocking the networking
// thread while the message processing happens
......
......@@ -47,7 +47,6 @@ public class ConsumerStats implements Serializable {
private final LongAdder numReceiveFailed;
private final LongAdder numAcksSent;
private final LongAdder numAcksFailed;
private final LongAdder numAcksTracker;
private final LongAdder totalMsgsReceived;
private final LongAdder totalBytesReceived;
private final LongAdder totalReceiveFailed;
......@@ -64,7 +63,6 @@ public class ConsumerStats implements Serializable {
numReceiveFailed = null;
numAcksSent = null;
numAcksFailed = null;
numAcksTracker = null;
totalMsgsReceived = null;
totalBytesReceived = null;
totalReceiveFailed = null;
......@@ -82,7 +80,6 @@ public class ConsumerStats implements Serializable {
numReceiveFailed = new LongAdder();
numAcksSent = new LongAdder();
numAcksFailed = new LongAdder();
numAcksTracker = new LongAdder();
totalMsgsReceived = new LongAdder();
totalBytesReceived = new LongAdder();
totalReceiveFailed = new LongAdder();
......@@ -156,10 +153,6 @@ public class ConsumerStats implements Serializable {
}
}
void incrementNumAcksSent() {
numAcksSent.increment();
}
void incrementNumAcksSent(long numAcks) {
numAcksSent.add(numAcks);
}
......@@ -176,18 +169,6 @@ public class ConsumerStats implements Serializable {
return statTimeout;
}
void resetNumAckTracker() {
numAcksTracker.reset();
}
void incrementNumAcksTracker(final int numMessages) {
numAcksTracker.add(numMessages);
}
long getNumAcksTrackerSumThenReset() {
return numAcksTracker.sumThenReset();
}
void reset() {
numMsgsReceived.reset();
numBytesReceived.reset();
......
......@@ -30,11 +30,6 @@ public class ConsumerStatsDisabled extends ConsumerStats {
// Do nothing
}
@Override
void incrementNumAcksSent() {
// Do nothing
}
@Override
void incrementNumAcksSent(long numAcks) {
// Do nothing
......@@ -45,20 +40,4 @@ public class ConsumerStatsDisabled extends ConsumerStats {
// Do nothing
}
@Override
void resetNumAckTracker() {
// Do nothing
}
@Override
void incrementNumAcksTracker(final int numMessages) {
// Do nothing
}
@Override
long getNumAcksTrackerSumThenReset() {
// Do nothing
return -1;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册