提交 c94fc4fa 编写于 作者: Y yukon

[ROCKETMQ-311] Add a swith for broker fast failure and support pull request queue

上级 cba30897
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
*/ */
package org.apache.rocketmq.broker.latency; package org.apache.rocketmq.broker.latency;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -27,6 +28,10 @@ import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; ...@@ -27,6 +28,10 @@ import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/**
* BrokerFastFailure will cover {@link BrokerController#sendThreadPoolQueue} and
* {@link BrokerController#pullThreadPoolQueue}
*/
public class BrokerFastFailure { public class BrokerFastFailure {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
...@@ -52,7 +57,9 @@ public class BrokerFastFailure { ...@@ -52,7 +57,9 @@ public class BrokerFastFailure {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override @Override
public void run() { public void run() {
cleanExpiredRequest(); if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
cleanExpiredRequest();
}
} }
}, 1000, 10, TimeUnit.MILLISECONDS); }, 1000, 10, TimeUnit.MILLISECONDS);
} }
...@@ -75,10 +82,18 @@ public class BrokerFastFailure { ...@@ -75,10 +82,18 @@ public class BrokerFastFailure {
} }
} }
cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
}
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) { while (true) {
try { try {
if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) { if (!blockingQueue.isEmpty()) {
final Runnable runnable = this.brokerController.getSendThreadPoolQueue().peek(); final Runnable runnable = blockingQueue.peek();
if (null == runnable) { if (null == runnable) {
break; break;
} }
...@@ -88,10 +103,10 @@ public class BrokerFastFailure { ...@@ -88,10 +103,10 @@ public class BrokerFastFailure {
} }
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp(); final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
if (behind >= this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) { if (behind >= maxWaitTimeMillsInQueue) {
if (this.brokerController.getSendThreadPoolQueue().remove(runnable)) { if (blockingQueue.remove(runnable)) {
rt.setStopRun(true); rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, this.brokerController.getSendThreadPoolQueue().size())); rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
} }
} else { } else {
break; break;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.latency;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class BrokerFastFailureTest {
@Test
public void testCleanExpiredRequestInQueue() throws Exception {
BrokerFastFailure brokerFastFailure = new BrokerFastFailure(null);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
assertThat(queue.size()).isZero();
//Normal Runnable
Runnable runnable = new Runnable() {
@Override
public void run() {
}
};
queue.add(runnable);
assertThat(queue.size()).isEqualTo(1);
brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
assertThat(queue.size()).isEqualTo(1);
queue.clear();
//With expired request
RequestTask expiredRequest = new RequestTask(runnable, null, null);
queue.add(new FutureTaskExt<>(expiredRequest, null));
TimeUnit.MILLISECONDS.sleep(100);
RequestTask requestTask = new RequestTask(runnable, null, null);
queue.add(new FutureTaskExt<>(requestTask, null));
assertThat(queue.size()).isEqualTo(2);
brokerFastFailure.cleanExpiredRequestInQueue(queue, 100);
assertThat(queue.size()).isEqualTo(1);
assertThat(((FutureTaskExt) queue.peek()).getRunnable()).isEqualTo(requestTask);
}
}
\ No newline at end of file
...@@ -103,7 +103,9 @@ public class BrokerConfig { ...@@ -103,7 +103,9 @@ public class BrokerConfig {
private boolean disableConsumeIfConsumerReadSlowly = false; private boolean disableConsumeIfConsumerReadSlowly = false;
private long consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16; private long consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16;
private boolean brokerFastFailureEnable = true;
private long waitTimeMillsInSendQueue = 200; private long waitTimeMillsInSendQueue = 200;
private long waitTimeMillsInPullQueue = 5 * 1000;
private long startAcceptSendRequestTimeStamp = 0L; private long startAcceptSendRequestTimeStamp = 0L;
...@@ -160,6 +162,22 @@ public class BrokerConfig { ...@@ -160,6 +162,22 @@ public class BrokerConfig {
this.consumerFallbehindThreshold = consumerFallbehindThreshold; this.consumerFallbehindThreshold = consumerFallbehindThreshold;
} }
public boolean isBrokerFastFailureEnable() {
return brokerFastFailureEnable;
}
public void setBrokerFastFailureEnable(final boolean brokerFastFailureEnable) {
this.brokerFastFailureEnable = brokerFastFailureEnable;
}
public long getWaitTimeMillsInPullQueue() {
return waitTimeMillsInPullQueue;
}
public void setWaitTimeMillsInPullQueue(final long waitTimeMillsInPullQueue) {
this.waitTimeMillsInPullQueue = waitTimeMillsInPullQueue;
}
public boolean isDisableConsumeIfConsumerReadSlowly() { public boolean isDisableConsumeIfConsumerReadSlowly() {
return disableConsumeIfConsumerReadSlowly; return disableConsumeIfConsumerReadSlowly;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册