From b575549c4cad1bc82bac2f9527ea5fa1c18ec7d3 Mon Sep 17 00:00:00 2001 From: kimi Date: Sat, 30 Jun 2012 20:49:03 +0800 Subject: [PATCH] =?UTF-8?q?DUBBO-424=20Heartbeat=20response=E4=B8=8D?= =?UTF-8?q?=E5=BA=94=E8=AF=A5=E5=9C=A8=E4=B8=9A=E5=8A=A1=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/header/HeaderExchangeHandler.java | 4 +- .../support/header/HeartbeatHandler.java | 124 +++++++++++++++++- .../transport/dispather/ChannelHandlers.java | 52 +++++--- .../dispather/WrappedChannelHandler.java | 16 --- .../dispather/all/AllChannelHandler.java | 19 +-- .../ConnectionOrderedChannelHandler.java | 24 +--- .../execution/ExecutionChannelHandler.java | 11 +- .../message/MessageOnlyChannelHandler.java | 24 +--- .../handler/ConnectChannelHandlerTest.java | 106 +++++++-------- 9 files changed, 233 insertions(+), 147 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java index 73db73e40..d47d46881 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/support/header/HeaderExchangeHandler.java @@ -44,9 +44,9 @@ public class HeaderExchangeHandler implements ChannelHandlerDelegate { protected static final Logger logger = LoggerFactory.getLogger(HeaderExchangeHandler.class); - public static String KEY_READ_TIMESTAMP = "READ_TIMESTAMP"; + public static String KEY_READ_TIMESTAMP = HeartbeatHandler.KEY_READ_TIMESTAMP; - public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP"; + public static String KEY_WRITE_TIMESTAMP = HeartbeatHandler.KEY_WRITE_TIMESTAMP; private final ExchangeHandler handler; diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/support/header/HeartbeatHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/support/header/HeartbeatHandler.java index 8a7aaca61..a9239581a 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/support/header/HeartbeatHandler.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/support/header/HeartbeatHandler.java @@ -1,8 +1,130 @@ +/* + * Copyright 1999-2011 Alibaba Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.alibaba.dubbo.remoting.exchange.support.header; +import com.alibaba.dubbo.common.logger.Logger; +import com.alibaba.dubbo.common.logger.LoggerFactory; +import com.alibaba.dubbo.remoting.Channel; +import com.alibaba.dubbo.remoting.ChannelHandler; +import com.alibaba.dubbo.remoting.RemotingException; +import com.alibaba.dubbo.remoting.exchange.Request; +import com.alibaba.dubbo.remoting.exchange.Response; +import com.alibaba.dubbo.remoting.transport.ChannelHandlerDelegate; + /** * @author kimi */ -public class HeartbeatHandler { +public class HeartbeatHandler implements ChannelHandlerDelegate { + + static final Logger log = LoggerFactory.getLogger(HeartbeatHandler.class); + + public static String KEY_READ_TIMESTAMP = "READ_TIMESTAMP"; + + public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP"; + + private ChannelHandler handler; + + public HeartbeatHandler(ChannelHandler handler) { + if (handler == null) { + throw new IllegalArgumentException("handler == null"); + } + this.handler = handler; + } + + public ChannelHandler getHandler() { + if (handler instanceof ChannelHandlerDelegate) { + return ((ChannelHandlerDelegate)handler).getHandler(); + } + return handler; + } + + public void connected(Channel channel) throws RemotingException { + setReadTimestamp(channel); + setWriteTimestamp(channel); + handler.connected(channel); + } + + public void disconnected(Channel channel) throws RemotingException { + clearReadTimestamp(channel); + clearWriteTimestamp(channel); + handler.disconnected(channel); + } + + public void sent(Channel channel, Object message) throws RemotingException { + setWriteTimestamp(channel); + handler.sent(channel, message); + } + + public void received(Channel channel, Object message) throws RemotingException { + setReadTimestamp(channel); + if (isHeartbeatRequest(message)) { + Request req = (Request) message; + if (req.isTwoWay()) { + Response res = new Response(req.getId(), req.getVersion()); + res.setEvent(Response.HEARTBEAT_EVENT); + channel.send(res); + if (log.isDebugEnabled()) { + log.debug( + new StringBuilder(32) + .append("Receive heartbeat request and send heartbeat in thread ") + .append(Thread.currentThread().getName()) + .toString()); + } + } + return; + } + if (isHeartbeatResponse(message)) { + if (log.isDebugEnabled()) { + log.debug( + new StringBuilder(32) + .append("Receive heartbeat response in thread ") + .append(Thread.currentThread().getName()) + .toString()); + } + return; + } + handler.received(channel, message); + } + + public void caught(Channel channel, Throwable exception) throws RemotingException { + handler.caught(channel, exception); + } + + private void setReadTimestamp(Channel channel) { + channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); + } + + private void setWriteTimestamp(Channel channel) { + channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); + } + + private void clearReadTimestamp(Channel channel) { + channel.removeAttribute(KEY_READ_TIMESTAMP); + } + + private void clearWriteTimestamp(Channel channel) { + channel.removeAttribute(KEY_WRITE_TIMESTAMP); + } + + private boolean isHeartbeatRequest(Object message) { + return message instanceof Request && ((Request) message).isHeartbeat(); + } + private boolean isHeartbeatResponse(Object message) { + return message instanceof Response && ((Response)message).isHeartbeat(); + } } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/ChannelHandlers.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/ChannelHandlers.java index f733a1a0c..9263de82c 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/ChannelHandlers.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/ChannelHandlers.java @@ -1,25 +1,26 @@ -/* - * Copyright 1999-2011 Alibaba Group. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +/* + * Copyright 1999-2011 Alibaba Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.alibaba.dubbo.remoting.transport.dispather; import com.alibaba.dubbo.common.URL; -import com.alibaba.dubbo.common.extension.ExtensionLoader; +import com.alibaba.dubbo.common.extension.ExtensionLoader; import com.alibaba.dubbo.remoting.ChannelHandler; import com.alibaba.dubbo.remoting.Dispather; +import com.alibaba.dubbo.remoting.exchange.support.header.HeartbeatHandler; /** * @author chao.liuc @@ -28,8 +29,23 @@ import com.alibaba.dubbo.remoting.Dispather; public class ChannelHandlers { public static ChannelHandler wrap(ChannelHandler handler, URL url){ - return ExtensionLoader.getExtensionLoader(Dispather.class) - .getAdaptiveExtension().dispath(handler, url); + return ChannelHandlers.getInstance().wrapInternal(handler, url); + } + + protected ChannelHandlers() {} + + protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { + return new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispather.class) + .getAdaptiveExtension().dispath(handler, url)); } + private static ChannelHandlers INSTANCE = new ChannelHandlers(); + + protected static ChannelHandlers getInstance() { + return INSTANCE; + } + + static void setTestingChannelHandlers(ChannelHandlers instance) { + INSTANCE = instance; + } } \ No newline at end of file diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/WrappedChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/WrappedChannelHandler.java index e82f6fbfd..b75849ecb 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/WrappedChannelHandler.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/WrappedChannelHandler.java @@ -74,14 +74,6 @@ public class WrappedChannelHandler implements ChannelHandlerDelegate { @SuppressWarnings("deprecation") public void received(Channel channel, Object message) throws RemotingException { - if (message instanceof Request && ((Request)message).isHeartbeat()){ - Request req = (Request) message; - if (req.isTwoWay()){ - Response res = new Response(req.getId(),req.getVersion()); - res.setHeartbeat(true); - channel.send(res); - } - } handler.received(channel, message); } @@ -105,12 +97,4 @@ public class WrappedChannelHandler implements ChannelHandlerDelegate { return url; } - protected final boolean isHeartbeatResponse(Object message) { - return (message instanceof Response) && ((Response)message).isHeartbeat(); - } - - protected void setReadTimestamp(Channel channel) { - channel.setAttribute( - HeaderExchangeHandler.KEY_READ_TIMESTAMP, System.currentTimeMillis()); - } } \ No newline at end of file diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/all/AllChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/all/AllChannelHandler.java index 9f54d5ad4..9715bac62 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/all/AllChannelHandler.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/all/AllChannelHandler.java @@ -52,20 +52,11 @@ public class AllChannelHandler extends WrappedChannelHandler { } public void received(Channel channel, Object message) throws RemotingException { - //FIXME 包的依赖顺序有问题 - if (message instanceof Request && ((Request)message).isEvent()){ - super.received(channel, message); - return; - } - if (!isHeartbeatResponse(message)) { - ExecutorService cexecutor = getExecutorService(); - try { - cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); - } catch (Throwable t) { - throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); - } - } else { - setReadTimestamp(channel); + ExecutorService cexecutor = getExecutorService(); + try { + cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); + } catch (Throwable t) { + throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/connection/ConnectionOrderedChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/connection/ConnectionOrderedChannelHandler.java index 62d6e6fc5..1983c5f67 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/connection/ConnectionOrderedChannelHandler.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/connection/ConnectionOrderedChannelHandler.java @@ -69,24 +69,14 @@ public class ConnectionOrderedChannelHandler extends WrappedChannelHandler { } public void received(Channel channel, Object message) throws RemotingException { - //FIXME 包的依赖顺序有问题 - if (message instanceof Request && ((Request)message).isEvent()){ - super.received(channel, message); - return; + ExecutorService cexecutor = executor; + if (cexecutor == null || cexecutor.isShutdown()) { + cexecutor = SHARED_EXECUTOR; } - - if (!isHeartbeatResponse(message)) { - ExecutorService cexecutor = executor; - if (cexecutor == null || cexecutor.isShutdown()) { - cexecutor = SHARED_EXECUTOR; - } - try { - cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); - } catch (Throwable t) { - throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); - } - } else { - setReadTimestamp(channel); + try { + cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); + } catch (Throwable t) { + throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/execution/ExecutionChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/execution/ExecutionChannelHandler.java index decad7862..0046c7765 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/execution/ExecutionChannelHandler.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/execution/ExecutionChannelHandler.java @@ -39,16 +39,7 @@ public class ExecutionChannelHandler extends WrappedChannelHandler { } public void received(Channel channel, Object message) throws RemotingException { - //FIXME 包的依赖顺序有问题 - if (message instanceof Request && ((Request)message).isEvent()){ - super.received(channel, message); - return; - } - if (!isHeartbeatResponse(message)) { - executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); - } else { - setReadTimestamp(channel); - } + executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } public void caught(Channel channel, Throwable exception) throws RemotingException { diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/message/MessageOnlyChannelHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/message/MessageOnlyChannelHandler.java index c333b3b1a..0ff45378d 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/message/MessageOnlyChannelHandler.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/transport/dispather/message/MessageOnlyChannelHandler.java @@ -34,24 +34,14 @@ public class MessageOnlyChannelHandler extends WrappedChannelHandler { } public void received(Channel channel, Object message) throws RemotingException { - //FIXME 包的依赖顺序有问题 - if (message instanceof Request && ((Request)message).isEvent()){ - super.received(channel, message); - return; + ExecutorService cexecutor = executor; + if (cexecutor == null || cexecutor.isShutdown()) { + cexecutor = SHARED_EXECUTOR; } - - if (!isHeartbeatResponse(message)) { - ExecutorService cexecutor = executor; - if (cexecutor == null || cexecutor.isShutdown()) { - cexecutor = SHARED_EXECUTOR; - } - try { - cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); - } catch (Throwable t) { - throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); - } - } else { - setReadTimestamp(channel); + try { + cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); + } catch (Throwable t) { + throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/handler/ConnectChannelHandlerTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/handler/ConnectChannelHandlerTest.java index 2b90b6c3c..fae3dc832 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/handler/ConnectChannelHandlerTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/com/alibaba/dubbo/remoting/handler/ConnectChannelHandlerTest.java @@ -1,33 +1,34 @@ -/* - * Copyright 1999-2011 Alibaba Group. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +/* + * Copyright 1999-2011 Alibaba Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.alibaba.dubbo.remoting.handler; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicInteger; - -import junit.framework.Assert; - -import org.junit.Before; -import org.junit.Test; - -import com.alibaba.dubbo.remoting.ExecutionException; -import com.alibaba.dubbo.remoting.RemotingException; -import com.alibaba.dubbo.remoting.exchange.Request; -import com.alibaba.dubbo.remoting.exchange.Response; -import com.alibaba.dubbo.remoting.transport.dispather.connection.ConnectionOrderedChannelHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; + +import junit.framework.Assert; + +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import com.alibaba.dubbo.remoting.ExecutionException; +import com.alibaba.dubbo.remoting.RemotingException; +import com.alibaba.dubbo.remoting.exchange.Request; +import com.alibaba.dubbo.remoting.exchange.Response; +import com.alibaba.dubbo.remoting.transport.dispather.connection.ConnectionOrderedChannelHandler; @@ -105,29 +106,30 @@ public class ConnectChannelHandlerTest extends WrappedChannelHandlerTest{ executor = (ThreadPoolExecutor)getField(handler, "executor", 1); executor.shutdown(); handler.received(new MockedChannel(), ""); - } - - /** - * 事件不通过线程池,直接在IO上执行 - */ - @SuppressWarnings("deprecation") - @Test - public void test_Received_Event_invoke_direct() throws RemotingException{ - handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url); - ThreadPoolExecutor executor = (ThreadPoolExecutor)getField(handler, "SHARED_EXECUTOR", 1); - executor.shutdown(); - executor = (ThreadPoolExecutor)getField(handler, "executor", 1); - executor.shutdown(); - Request req = new Request(); - req.setHeartbeat(true); - final AtomicInteger count = new AtomicInteger(0); - handler.received(new MockedChannel(){ - @Override - public void send(Object message) throws RemotingException { - Assert.assertEquals("response.heartbeat", true, ((Response)message).isHeartbeat()); - count.incrementAndGet(); - } - }, req); - Assert.assertEquals("channel.send must be invoke", 1, count.get()); + } + + /** + * 事件不通过线程池,直接在IO上执行 + */ + @SuppressWarnings("deprecation") + @Ignore("Heartbeat is processed in HeartbeatHandler not WrappedChannelHandler.") + @Test + public void test_Received_Event_invoke_direct() throws RemotingException{ + handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url); + ThreadPoolExecutor executor = (ThreadPoolExecutor)getField(handler, "SHARED_EXECUTOR", 1); + executor.shutdown(); + executor = (ThreadPoolExecutor)getField(handler, "executor", 1); + executor.shutdown(); + Request req = new Request(); + req.setHeartbeat(true); + final AtomicInteger count = new AtomicInteger(0); + handler.received(new MockedChannel(){ + @Override + public void send(Object message) throws RemotingException { + Assert.assertEquals("response.heartbeat", true, ((Response)message).isHeartbeat()); + count.incrementAndGet(); + } + }, req); + Assert.assertEquals("channel.send must be invoke", 1, count.get()); } } \ No newline at end of file -- GitLab