From 6e31d864e3f49b1296bad2e24955bee4d918d31d Mon Sep 17 00:00:00 2001 From: yukon Date: Mon, 9 Jan 2017 21:58:38 +0800 Subject: [PATCH] [ROCKETMQ-22] Resolve ClassCastException issue in printWaterMark. --- .../rocketmq/broker/BrokerController.java | 2 +- .../broker/latency/BrokerFastFailure.java | 4 +- .../longpolling/PullRequestHoldService.java | 4 +- .../processor/PullMessageProcessor.java | 6 +- .../broker/api/BrokerFastFailureTest.java | 61 +++++++++++++++++++ 5 files changed, 69 insertions(+), 8 deletions(-) create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 9b89c85c..af69001c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -463,7 +463,7 @@ public class BrokerController { final Runnable peek = q.peek(); if (peek != null) { RequestTask rt = BrokerFastFailure.castRunnable(peek); - slowTimeMills = this.messageStore.now() - rt.getCreateTimestamp(); + slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp(); } if (slowTimeMills < 0) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index d7d12769..a2a1aa09 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -71,7 +71,7 @@ public class BrokerFastFailure { } else { break; } - } catch (Throwable e) { + } catch (Throwable ignored) { } } @@ -99,7 +99,7 @@ public class BrokerFastFailure { } else { break; } - } catch (Throwable e) { + } catch (Throwable ignored) { } } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java index ff068d26..fdba50da 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java @@ -130,7 +130,7 @@ public class PullRequestHoldService extends ServiceThread { if (newestOffset > request.getPullFromThisOffset()) { if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) { try { - this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), + this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); @@ -141,7 +141,7 @@ public class PullRequestHoldService extends ServiceThread { if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { try { - this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), + this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 382030be..7d158943 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -50,6 +50,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.MessageExtBrokerInner; @@ -481,7 +482,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { } } - public void excuteRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { + public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { Runnable run = new Runnable() { @Override public void run() { @@ -513,8 +514,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { } } }; - - this.brokerController.getPullMessageExecutor().submit(run); + this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request)); } public void registerConsumeMessageHook(List sendMessageHookList) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java b/broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java new file mode 100644 index 00000000..bec0af5a --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.api; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.broker.BrokerTestHarness; +import org.apache.rocketmq.broker.latency.BrokerFastFailure; +import org.apache.rocketmq.broker.latency.FutureTaskExt; +import org.apache.rocketmq.remoting.netty.RequestTask; +import org.junit.Assert; +import org.junit.Test; + +public class BrokerFastFailureTest extends BrokerTestHarness { + + @Test + public void testHeadSlowTimeMills() throws InterruptedException { + BlockingQueue blockingQueue = new LinkedBlockingQueue<>(); + blockingQueue.add(new FutureTaskExt<>(new RequestTask(null, null, null), null)); + TimeUnit.MILLISECONDS.sleep(10); + Assert.assertTrue(this.brokerController.headSlowTimeMills(blockingQueue) > 0); + + blockingQueue.clear(); + blockingQueue.add(new Runnable() { + @Override public void run() { + + } + }); + Assert.assertTrue(this.brokerController.headSlowTimeMills(blockingQueue) == 0); + } + + @Test + public void testCastRunnable() { + Runnable runnable = new Runnable() { + @Override public void run() { + + } + }; + Assert.assertNull(BrokerFastFailure.castRunnable(runnable)); + + RequestTask requestTask = new RequestTask(null, null, null); + runnable = new FutureTaskExt<>(requestTask, null); + + Assert.assertEquals(requestTask, BrokerFastFailure.castRunnable(runnable)); + } +} -- GitLab