diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index 2d4bedcb96e76ba1f723287d2c110f80470722ff..6aefe81a1139ef8434203d870ae054a779d89196 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.broker.latency; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -27,6 +28,10 @@ import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * BrokerFastFailure will cover {@link BrokerController#sendThreadPoolQueue} and + * {@link BrokerController#pullThreadPoolQueue} + */ public class BrokerFastFailure { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( @@ -52,7 +57,9 @@ public class BrokerFastFailure { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { - cleanExpiredRequest(); + if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) { + cleanExpiredRequest(); + } } }, 1000, 10, TimeUnit.MILLISECONDS); } @@ -75,10 +82,18 @@ public class BrokerFastFailure { } } + cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(), + this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()); + + cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(), + this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue()); + } + + void cleanExpiredRequestInQueue(final BlockingQueue blockingQueue, final long maxWaitTimeMillsInQueue) { while (true) { try { - if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) { - final Runnable runnable = this.brokerController.getSendThreadPoolQueue().peek(); + if (!blockingQueue.isEmpty()) { + final Runnable runnable = blockingQueue.peek(); if (null == runnable) { break; } @@ -88,10 +103,10 @@ public class BrokerFastFailure { } final long behind = System.currentTimeMillis() - rt.getCreateTimestamp(); - if (behind >= this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) { - if (this.brokerController.getSendThreadPoolQueue().remove(runnable)) { + if (behind >= maxWaitTimeMillsInQueue) { + if (blockingQueue.remove(runnable)) { rt.setStopRun(true); - rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, this.brokerController.getSendThreadPoolQueue().size())); + rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size())); } } else { break; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java new file mode 100644 index 0000000000000000000000000000000000000000..5d0f7f9d72b5b31a8308f87c969fe83d6a41335c --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.latency; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.remoting.netty.RequestTask; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class BrokerFastFailureTest { + @Test + public void testCleanExpiredRequestInQueue() throws Exception { + BrokerFastFailure brokerFastFailure = new BrokerFastFailure(null); + + BlockingQueue queue = new LinkedBlockingQueue<>(); + brokerFastFailure.cleanExpiredRequestInQueue(queue, 1); + assertThat(queue.size()).isZero(); + + //Normal Runnable + Runnable runnable = new Runnable() { + @Override + public void run() { + + } + }; + queue.add(runnable); + + assertThat(queue.size()).isEqualTo(1); + brokerFastFailure.cleanExpiredRequestInQueue(queue, 1); + assertThat(queue.size()).isEqualTo(1); + + queue.clear(); + + //With expired request + RequestTask expiredRequest = new RequestTask(runnable, null, null); + queue.add(new FutureTaskExt<>(expiredRequest, null)); + TimeUnit.MILLISECONDS.sleep(100); + + RequestTask requestTask = new RequestTask(runnable, null, null); + queue.add(new FutureTaskExt<>(requestTask, null)); + + assertThat(queue.size()).isEqualTo(2); + brokerFastFailure.cleanExpiredRequestInQueue(queue, 100); + assertThat(queue.size()).isEqualTo(1); + assertThat(((FutureTaskExt) queue.peek()).getRunnable()).isEqualTo(requestTask); + } + +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 9a208a3fad05b3f3a555204532c13e6679b78c7f..a67fa74a3b5e45c4c20a573a0c1c8366166d36d1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -103,7 +103,9 @@ public class BrokerConfig { private boolean disableConsumeIfConsumerReadSlowly = false; private long consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16; + private boolean brokerFastFailureEnable = true; private long waitTimeMillsInSendQueue = 200; + private long waitTimeMillsInPullQueue = 5 * 1000; private long startAcceptSendRequestTimeStamp = 0L; @@ -160,6 +162,22 @@ public class BrokerConfig { this.consumerFallbehindThreshold = consumerFallbehindThreshold; } + public boolean isBrokerFastFailureEnable() { + return brokerFastFailureEnable; + } + + public void setBrokerFastFailureEnable(final boolean brokerFastFailureEnable) { + this.brokerFastFailureEnable = brokerFastFailureEnable; + } + + public long getWaitTimeMillsInPullQueue() { + return waitTimeMillsInPullQueue; + } + + public void setWaitTimeMillsInPullQueue(final long waitTimeMillsInPullQueue) { + this.waitTimeMillsInPullQueue = waitTimeMillsInPullQueue; + } + public boolean isDisableConsumeIfConsumerReadSlowly() { return disableConsumeIfConsumerReadSlowly; }