提交 b575549c 编写于 作者: K kimi

DUBBO-424 Heartbeat response不应该在业务线程处理

上级 215e1405
......@@ -44,9 +44,9 @@ public class HeaderExchangeHandler implements ChannelHandlerDelegate {
protected static final Logger logger = LoggerFactory.getLogger(HeaderExchangeHandler.class);
public static String KEY_READ_TIMESTAMP = "READ_TIMESTAMP";
public static String KEY_READ_TIMESTAMP = HeartbeatHandler.KEY_READ_TIMESTAMP;
public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";
public static String KEY_WRITE_TIMESTAMP = HeartbeatHandler.KEY_WRITE_TIMESTAMP;
private final ExchangeHandler handler;
......
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.remoting.exchange.support.header;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.exchange.Request;
import com.alibaba.dubbo.remoting.exchange.Response;
import com.alibaba.dubbo.remoting.transport.ChannelHandlerDelegate;
/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
public class HeartbeatHandler {
public class HeartbeatHandler implements ChannelHandlerDelegate {
static final Logger log = LoggerFactory.getLogger(HeartbeatHandler.class);
public static String KEY_READ_TIMESTAMP = "READ_TIMESTAMP";
public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";
private ChannelHandler handler;
public HeartbeatHandler(ChannelHandler handler) {
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.handler = handler;
}
public ChannelHandler getHandler() {
if (handler instanceof ChannelHandlerDelegate) {
return ((ChannelHandlerDelegate)handler).getHandler();
}
return handler;
}
public void connected(Channel channel) throws RemotingException {
setReadTimestamp(channel);
setWriteTimestamp(channel);
handler.connected(channel);
}
public void disconnected(Channel channel) throws RemotingException {
clearReadTimestamp(channel);
clearWriteTimestamp(channel);
handler.disconnected(channel);
}
public void sent(Channel channel, Object message) throws RemotingException {
setWriteTimestamp(channel);
handler.sent(channel, message);
}
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(Response.HEARTBEAT_EVENT);
channel.send(res);
if (log.isDebugEnabled()) {
log.debug(
new StringBuilder(32)
.append("Receive heartbeat request and send heartbeat in thread ")
.append(Thread.currentThread().getName())
.toString());
}
}
return;
}
if (isHeartbeatResponse(message)) {
if (log.isDebugEnabled()) {
log.debug(
new StringBuilder(32)
.append("Receive heartbeat response in thread ")
.append(Thread.currentThread().getName())
.toString());
}
return;
}
handler.received(channel, message);
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
handler.caught(channel, exception);
}
private void setReadTimestamp(Channel channel) {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
}
private void setWriteTimestamp(Channel channel) {
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
}
private void clearReadTimestamp(Channel channel) {
channel.removeAttribute(KEY_READ_TIMESTAMP);
}
private void clearWriteTimestamp(Channel channel) {
channel.removeAttribute(KEY_WRITE_TIMESTAMP);
}
private boolean isHeartbeatRequest(Object message) {
return message instanceof Request && ((Request) message).isHeartbeat();
}
private boolean isHeartbeatResponse(Object message) {
return message instanceof Response && ((Response)message).isHeartbeat();
}
}
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.remoting.transport.dispather;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.Dispather;
import com.alibaba.dubbo.remoting.exchange.support.header.HeartbeatHandler;
/**
* @author chao.liuc
......@@ -28,8 +29,23 @@ import com.alibaba.dubbo.remoting.Dispather;
public class ChannelHandlers {
public static ChannelHandler wrap(ChannelHandler handler, URL url){
return ExtensionLoader.getExtensionLoader(Dispather.class)
.getAdaptiveExtension().dispath(handler, url);
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected ChannelHandlers() {}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispather.class)
.getAdaptiveExtension().dispath(handler, url));
}
private static ChannelHandlers INSTANCE = new ChannelHandlers();
protected static ChannelHandlers getInstance() {
return INSTANCE;
}
static void setTestingChannelHandlers(ChannelHandlers instance) {
INSTANCE = instance;
}
}
\ No newline at end of file
......@@ -74,14 +74,6 @@ public class WrappedChannelHandler implements ChannelHandlerDelegate {
@SuppressWarnings("deprecation")
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Request && ((Request)message).isHeartbeat()){
Request req = (Request) message;
if (req.isTwoWay()){
Response res = new Response(req.getId(),req.getVersion());
res.setHeartbeat(true);
channel.send(res);
}
}
handler.received(channel, message);
}
......@@ -105,12 +97,4 @@ public class WrappedChannelHandler implements ChannelHandlerDelegate {
return url;
}
protected final boolean isHeartbeatResponse(Object message) {
return (message instanceof Response) && ((Response)message).isHeartbeat();
}
protected void setReadTimestamp(Channel channel) {
channel.setAttribute(
HeaderExchangeHandler.KEY_READ_TIMESTAMP, System.currentTimeMillis());
}
}
\ No newline at end of file
......@@ -52,20 +52,11 @@ public class AllChannelHandler extends WrappedChannelHandler {
}
public void received(Channel channel, Object message) throws RemotingException {
//FIXME 包的依赖顺序有问题
if (message instanceof Request && ((Request)message).isEvent()){
super.received(channel, message);
return;
}
if (!isHeartbeatResponse(message)) {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
} else {
setReadTimestamp(channel);
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
......
......@@ -69,24 +69,14 @@ public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
}
public void received(Channel channel, Object message) throws RemotingException {
//FIXME 包的依赖顺序有问题
if (message instanceof Request && ((Request)message).isEvent()){
super.received(channel, message);
return;
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
if (!isHeartbeatResponse(message)) {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
} else {
setReadTimestamp(channel);
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
......
......@@ -39,16 +39,7 @@ public class ExecutionChannelHandler extends WrappedChannelHandler {
}
public void received(Channel channel, Object message) throws RemotingException {
//FIXME 包的依赖顺序有问题
if (message instanceof Request && ((Request)message).isEvent()){
super.received(channel, message);
return;
}
if (!isHeartbeatResponse(message)) {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} else {
setReadTimestamp(channel);
}
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
......
......@@ -34,24 +34,14 @@ public class MessageOnlyChannelHandler extends WrappedChannelHandler {
}
public void received(Channel channel, Object message) throws RemotingException {
//FIXME 包的依赖顺序有问题
if (message instanceof Request && ((Request)message).isEvent()){
super.received(channel, message);
return;
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
if (!isHeartbeatResponse(message)) {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
} else {
setReadTimestamp(channel);
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
......
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.remoting.handler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import com.alibaba.dubbo.remoting.ExecutionException;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.exchange.Request;
import com.alibaba.dubbo.remoting.exchange.Response;
import com.alibaba.dubbo.remoting.transport.dispather.connection.ConnectionOrderedChannelHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import com.alibaba.dubbo.remoting.ExecutionException;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.exchange.Request;
import com.alibaba.dubbo.remoting.exchange.Response;
import com.alibaba.dubbo.remoting.transport.dispather.connection.ConnectionOrderedChannelHandler;
......@@ -105,29 +106,30 @@ public class ConnectChannelHandlerTest extends WrappedChannelHandlerTest{
executor = (ThreadPoolExecutor)getField(handler, "executor", 1);
executor.shutdown();
handler.received(new MockedChannel(), "");
}
/**
* 事件不通过线程池,直接在IO上执行
*/
@SuppressWarnings("deprecation")
@Test
public void test_Received_Event_invoke_direct() throws RemotingException{
handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url);
ThreadPoolExecutor executor = (ThreadPoolExecutor)getField(handler, "SHARED_EXECUTOR", 1);
executor.shutdown();
executor = (ThreadPoolExecutor)getField(handler, "executor", 1);
executor.shutdown();
Request req = new Request();
req.setHeartbeat(true);
final AtomicInteger count = new AtomicInteger(0);
handler.received(new MockedChannel(){
@Override
public void send(Object message) throws RemotingException {
Assert.assertEquals("response.heartbeat", true, ((Response)message).isHeartbeat());
count.incrementAndGet();
}
}, req);
Assert.assertEquals("channel.send must be invoke", 1, count.get());
}
/**
* 事件不通过线程池,直接在IO上执行
*/
@SuppressWarnings("deprecation")
@Ignore("Heartbeat is processed in HeartbeatHandler not WrappedChannelHandler.")
@Test
public void test_Received_Event_invoke_direct() throws RemotingException{
handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url);
ThreadPoolExecutor executor = (ThreadPoolExecutor)getField(handler, "SHARED_EXECUTOR", 1);
executor.shutdown();
executor = (ThreadPoolExecutor)getField(handler, "executor", 1);
executor.shutdown();
Request req = new Request();
req.setHeartbeat(true);
final AtomicInteger count = new AtomicInteger(0);
handler.received(new MockedChannel(){
@Override
public void send(Object message) throws RemotingException {
Assert.assertEquals("response.heartbeat", true, ((Response)message).isHeartbeat());
count.incrementAndGet();
}
}, req);
Assert.assertEquals("channel.send must be invoke", 1, count.get());
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册