未验证 提交 6854b007 编写于 作者: A Addison Higham 提交者: GitHub

[broker] Increase timeout for loading topics (#6750)

In #6489, a timeout was introduced to make sure calls into the
BrokerService finish or error out. However, this timeout is too low by
default when loading topics that have many replicated clusters.

Loading replicated topics is quite an expensive operation, involve
global ZK lookups and the start of many sub-processes. While we would
hope it finishes in 60 seconds we want to safe.

Long term, it may make sense to break out this operation into more
steps where each step can have it's own timeout
Co-authored-by: NAddison Higham <ahigham@instructure.com>
上级 f59a3703
......@@ -224,6 +224,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Flag to skip broker shutdown when broker handles Out of memory error"
)
private boolean skipBrokerShutdownOnOOM = false;
@FieldContext(
category = CATEGORY_SERVER,
doc = "Amount of seconds to timeout when loading a topic. In situations with many geo-replicated clusters, this may need raised."
)
private long topicLoadTimeoutSeconds = 60;
@FieldContext(
category = CATEGORY_POLICIES,
......
......@@ -861,7 +861,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final String topic,
boolean createIfMissing) throws RuntimeException {
checkTopicNsOwnership(topic);
final CompletableFuture<Optional<Topic>> topicFuture = futureWithDeadline();
final CompletableFuture<Optional<Topic>> topicFuture = futureWithDeadline(pulsar.getConfiguration().getTopicLoadTimeoutSeconds(),
TimeUnit.SECONDS, new TimeoutException("Failed to load topic within timeout"));
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load persistent topic {}", topic);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册