diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 9b89c85c14347cd28eda33244714a50d7decff29..af69001c3dec83fe624fe6f6f03e5d357d6b849c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -463,7 +463,7 @@ public class BrokerController { final Runnable peek = q.peek(); if (peek != null) { RequestTask rt = BrokerFastFailure.castRunnable(peek); - slowTimeMills = this.messageStore.now() - rt.getCreateTimestamp(); + slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp(); } if (slowTimeMills < 0) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index d7d127699e5d6b82e5c8f9799758dacb6836627b..a2a1aa095f727593494aef799a20b165a69187db 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -71,7 +71,7 @@ public class BrokerFastFailure { } else { break; } - } catch (Throwable e) { + } catch (Throwable ignored) { } } @@ -99,7 +99,7 @@ public class BrokerFastFailure { } else { break; } - } catch (Throwable e) { + } catch (Throwable ignored) { } } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java index ff068d26b303723ab4cf4bf5c5801ea6e71d0068..fdba50daaf6d8a320efbfb29d8ec4aa58229dce0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java @@ -130,7 +130,7 @@ public class PullRequestHoldService extends ServiceThread { if (newestOffset > request.getPullFromThisOffset()) { if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) { try { - this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), + this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); @@ -141,7 +141,7 @@ public class PullRequestHoldService extends ServiceThread { if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { try { - this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), + this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 382030be47835ad1cff7e758b0d99173554dde51..7d15894353448c8e3d05d4cbabc73186303ad7a6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -50,6 +50,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.MessageExtBrokerInner; @@ -481,7 +482,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { } } - public void excuteRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { + public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { Runnable run = new Runnable() { @Override public void run() { @@ -513,8 +514,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { } } }; - - this.brokerController.getPullMessageExecutor().submit(run); + this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request)); } public void registerConsumeMessageHook(List sendMessageHookList) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java b/broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java new file mode 100644 index 0000000000000000000000000000000000000000..bec0af5a0d6dac4feb961439add24227d689b5c2 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.api; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.broker.BrokerTestHarness; +import org.apache.rocketmq.broker.latency.BrokerFastFailure; +import org.apache.rocketmq.broker.latency.FutureTaskExt; +import org.apache.rocketmq.remoting.netty.RequestTask; +import org.junit.Assert; +import org.junit.Test; + +public class BrokerFastFailureTest extends BrokerTestHarness { + + @Test + public void testHeadSlowTimeMills() throws InterruptedException { + BlockingQueue blockingQueue = new LinkedBlockingQueue<>(); + blockingQueue.add(new FutureTaskExt<>(new RequestTask(null, null, null), null)); + TimeUnit.MILLISECONDS.sleep(10); + Assert.assertTrue(this.brokerController.headSlowTimeMills(blockingQueue) > 0); + + blockingQueue.clear(); + blockingQueue.add(new Runnable() { + @Override public void run() { + + } + }); + Assert.assertTrue(this.brokerController.headSlowTimeMills(blockingQueue) == 0); + } + + @Test + public void testCastRunnable() { + Runnable runnable = new Runnable() { + @Override public void run() { + + } + }; + Assert.assertNull(BrokerFastFailure.castRunnable(runnable)); + + RequestTask requestTask = new RequestTask(null, null, null); + runnable = new FutureTaskExt<>(requestTask, null); + + Assert.assertEquals(requestTask, BrokerFastFailure.castRunnable(runnable)); + } +}