提交 6e31d864 编写于 作者: Y yukon

[ROCKETMQ-22] Resolve ClassCastException issue in printWaterMark.

上级 1562bd0d
...@@ -463,7 +463,7 @@ public class BrokerController { ...@@ -463,7 +463,7 @@ public class BrokerController {
final Runnable peek = q.peek(); final Runnable peek = q.peek();
if (peek != null) { if (peek != null) {
RequestTask rt = BrokerFastFailure.castRunnable(peek); RequestTask rt = BrokerFastFailure.castRunnable(peek);
slowTimeMills = this.messageStore.now() - rt.getCreateTimestamp(); slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
} }
if (slowTimeMills < 0) if (slowTimeMills < 0)
......
...@@ -71,7 +71,7 @@ public class BrokerFastFailure { ...@@ -71,7 +71,7 @@ public class BrokerFastFailure {
} else { } else {
break; break;
} }
} catch (Throwable e) { } catch (Throwable ignored) {
} }
} }
...@@ -99,7 +99,7 @@ public class BrokerFastFailure { ...@@ -99,7 +99,7 @@ public class BrokerFastFailure {
} else { } else {
break; break;
} }
} catch (Throwable e) { } catch (Throwable ignored) {
} }
} }
} }
......
...@@ -130,7 +130,7 @@ public class PullRequestHoldService extends ServiceThread { ...@@ -130,7 +130,7 @@ public class PullRequestHoldService extends ServiceThread {
if (newestOffset > request.getPullFromThisOffset()) { if (newestOffset > request.getPullFromThisOffset()) {
if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) { if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) {
try { try {
this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand()); request.getRequestCommand());
} catch (Throwable e) { } catch (Throwable e) {
log.error("execute request when wakeup failed.", e); log.error("execute request when wakeup failed.", e);
...@@ -141,7 +141,7 @@ public class PullRequestHoldService extends ServiceThread { ...@@ -141,7 +141,7 @@ public class PullRequestHoldService extends ServiceThread {
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try { try {
this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand()); request.getRequestCommand());
} catch (Throwable e) { } catch (Throwable e) {
log.error("execute request when wakeup failed.", e); log.error("execute request when wakeup failed.", e);
......
...@@ -50,6 +50,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; ...@@ -50,6 +50,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageExtBrokerInner;
...@@ -481,7 +482,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { ...@@ -481,7 +482,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
} }
} }
public void excuteRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException {
Runnable run = new Runnable() { Runnable run = new Runnable() {
@Override @Override
public void run() { public void run() {
...@@ -513,8 +514,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { ...@@ -513,8 +514,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
} }
} }
}; };
this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
this.brokerController.getPullMessageExecutor().submit(run);
} }
public void registerConsumeMessageHook(List<ConsumeMessageHook> sendMessageHookList) { public void registerConsumeMessageHook(List<ConsumeMessageHook> sendMessageHookList) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.api;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerTestHarness;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.latency.FutureTaskExt;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.junit.Assert;
import org.junit.Test;
public class BrokerFastFailureTest extends BrokerTestHarness {
@Test
public void testHeadSlowTimeMills() throws InterruptedException {
BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>();
blockingQueue.add(new FutureTaskExt<>(new RequestTask(null, null, null), null));
TimeUnit.MILLISECONDS.sleep(10);
Assert.assertTrue(this.brokerController.headSlowTimeMills(blockingQueue) > 0);
blockingQueue.clear();
blockingQueue.add(new Runnable() {
@Override public void run() {
}
});
Assert.assertTrue(this.brokerController.headSlowTimeMills(blockingQueue) == 0);
}
@Test
public void testCastRunnable() {
Runnable runnable = new Runnable() {
@Override public void run() {
}
};
Assert.assertNull(BrokerFastFailure.castRunnable(runnable));
RequestTask requestTask = new RequestTask(null, null, null);
runnable = new FutureTaskExt<>(requestTask, null);
Assert.assertEquals(requestTask, BrokerFastFailure.castRunnable(runnable));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册