提交 fe687c3c 编写于 作者: M Matteo Merli 提交者: Jia Zhai

Allow to configure ack-timeout tick time (#4760)

### Motivation

After the changes in #3118, there has a been a sharp increase of memory utilization for the UnackedMessageTracker due to the time buckets being created.

This is especially true when the acktimeout is set to a larger value (eg: 1h) where 3600 time-buckets are being created. This lead to use 20MB per partition even when no message is tracked.

Allowing to configure the tick time so that application can tune it based on needs.

Additionally, fixed the logic that keeps creating hash maps and throwing them away at each tick time iteration, since that creates a lot of garbage and doesn't take care of the fact that the hash maps are expanding based on the required capacity (so next time they are already of the "right" size).

On a final note: the current default of 1sec seems very wasteful. Something like 10s should be more appropriate as default.

(cherry picked from commit f13af487)
上级 f14a61a4
......@@ -170,7 +170,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
/**
* Set the timeout for unacked messages, truncated to the nearest millisecond. The timeout needs to be greater than
* 10 seconds.
* 1 second.
* <p>
* By default, the acknowledge timeout is disabled and that means that messages delivered to a
* consumer will not be re-delivered unless the consumer crashes.
......@@ -187,6 +187,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
*/
ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
/**
* Define the granularity of the ack-timeout redelivery.
* <p>
* By default, the tick time is set to 1 second. Using an higher tick time will
* reduce the memory overhead to track messages when the ack-timeout is set to
* bigger values (eg: 1hour).
*
* @param tickTime
* the min precision for the ack timeout messages tracker
* @param timeUnit
* unit in which the timeout is provided.
* @return the consumer builder instance
*/
ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);
/**
* Set the delay to wait before re-delivering messages that have failed to be process.
* <p>
......@@ -386,7 +401,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
* C5 1 1
* Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
* </pre>
*
*
* <b>Failover subscription</b>
* Broker selects active consumer for a failover-subscription based on consumer's priority-level and lexicographical sorting of a consumer name.
* eg:
......@@ -395,15 +410,15 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Consumer PriorityLevel Name
* C1 0 aaa
* C2 0 bbb
*
*
* 2. Active consumer = C2 : Consumer with highest priority
* Consumer PriorityLevel Name
* C1 1 aaa
* C2 0 bbb
*
*
* Partitioned-topics:
* Broker evenly assigns partitioned topics to highest priority consumers.
*
*
* </pre>
*
* @param priorityLevel the priority of this consumer
......
......@@ -58,6 +58,7 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
private List<ConsumerInterceptor<T>> interceptorList;
private static long MIN_ACK_TIMEOUT_MILLIS = 1000;
private static long MIN_TICK_TIME_MILLIS = 100;
private static long DEFAULT_ACK_TIMEOUT_MILLIS_FOR_DEAD_LETTER = 30000L;
......@@ -156,6 +157,14 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
return this;
}
@Override
public ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit) {
checkArgument(timeUnit.toMillis(tickTime) >= MIN_TICK_TIME_MILLIS,
"Ack timeout tick time should be greater than " + MIN_TICK_TIME_MILLIS + " ms");
conf.setTickDurationMillis(timeUnit.toMillis(tickTime));
return this;
}
@Override
public ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit timeUnit) {
checkArgument(redeliveryDelay >= 0, "redeliveryDelay needs to be >= 0");
......
......@@ -21,12 +21,15 @@ package org.apache.pulsar.client.impl;
import com.google.common.base.Preconditions;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
......@@ -40,7 +43,7 @@ public class UnAckedMessageTracker implements Closeable {
private static final Logger log = LoggerFactory.getLogger(UnAckedMessageTracker.class);
protected final ConcurrentHashMap<MessageId, ConcurrentOpenHashSet<MessageId>> messageIdPartitionMap;
protected final LinkedList<ConcurrentOpenHashSet<MessageId>> timePartitions;
protected final ArrayDeque<ConcurrentOpenHashSet<MessageId>> timePartitions;
protected final Lock readLock;
protected final Lock writeLock;
......@@ -94,6 +97,13 @@ public class UnAckedMessageTracker implements Closeable {
this(client, consumerBase, ackTimeoutMillis, ackTimeoutMillis);
}
private static final FastThreadLocal<HashSet<MessageId>> TL_MESSAGE_IDS_SET = new FastThreadLocal<HashSet<MessageId>>() {
@Override
protected HashSet<MessageId> initialValue() throws Exception {
return new HashSet<>();
}
};
public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBase, long ackTimeoutMillis, long tickDurationInMs) {
Preconditions.checkArgument(tickDurationInMs > 0 && ackTimeoutMillis >= tickDurationInMs);
this.ackTimeoutMillis = ackTimeoutMillis;
......@@ -102,20 +112,21 @@ public class UnAckedMessageTracker implements Closeable {
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
this.messageIdPartitionMap = new ConcurrentHashMap<>();
this.timePartitions = new LinkedList<>();
this.timePartitions = new ArrayDeque<>();
int blankPartitions = (int)Math.ceil((double)this.ackTimeoutMillis / this.tickDurationInMs);
for (int i = 0; i < blankPartitions + 1; i++) {
timePartitions.add(new ConcurrentOpenHashSet<>());
timePartitions.add(new ConcurrentOpenHashSet<>(16, 1));
}
timeout = client.timer().newTimeout(new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
Set<MessageId> messageIds = new HashSet<>();
Set<MessageId> messageIds = TL_MESSAGE_IDS_SET.get();
messageIds.clear();
writeLock.lock();
try {
timePartitions.addLast(new ConcurrentOpenHashSet<>());
ConcurrentOpenHashSet<MessageId> headPartition = timePartitions.removeFirst();
if (!headPartition.isEmpty()) {
log.warn("[{}] {} messages have timed-out", consumerBase, timePartitions.size());
......@@ -124,6 +135,9 @@ public class UnAckedMessageTracker implements Closeable {
messageIdPartitionMap.remove(messageId);
});
}
headPartition.clear();
timePartitions.addLast(headPartition);
} finally {
writeLock.unlock();
}
......@@ -140,11 +154,7 @@ public class UnAckedMessageTracker implements Closeable {
writeLock.lock();
try {
messageIdPartitionMap.clear();
timePartitions.clear();
int blankPartitions = (int)Math.ceil((double)ackTimeoutMillis / tickDurationInMs);
for (int i = 0; i < blankPartitions + 1; i++) {
timePartitions.add(new ConcurrentOpenHashSet<>());
}
timePartitions.forEach(tp -> tp.clear());
} finally {
writeLock.unlock();
}
......
......@@ -86,7 +86,7 @@ public class PulsarConnectorCache {
.setZkServers(pulsarConnectorConfig.getZookeeperUri())
.setClientTcpNoDelay(false)
.setUseV2WireProtocol(true)
.setStickyReadsEnabled(true)
.setStickyReadsEnabled(false)
.setReadEntryTimeout(60);
return new ManagedLedgerFactoryImpl(bkClientConfiguration);
}
......
......@@ -128,7 +128,7 @@ public class PulsarSplitManager implements ConnectorSplitManager {
ClientConfiguration bkClientConfiguration = new ClientConfiguration()
.setZkServers(this.pulsarConnectorConfig.getZookeeperUri())
.setClientTcpNoDelay(false)
.setStickyReadsEnabled(true)
.setStickyReadsEnabled(false)
.setUseV2WireProtocol(true);
return new ManagedLedgerFactoryImpl(bkClientConfiguration);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册